Performance Issues Using ORDER to Reduce the Number of Out Files – Apache Pig 0.16 Amazon EMR

Often you have a simple ETL process (a Pig job in our example) that just applies a filter to the source data, performs some calculations and saves the result to a target table.

So it is a Map-only job (no Reduce phase is required) that generates N output files, where N is the number of map tasks.

For example:

set mapreduce.output.fileoutputformat.compress true

-- Read data from a Hive table
data_all = LOAD 'dmtolpeko.mcp' USING org.apache.hive.hcatalog.pig.HCatLoader();

-- Filter selects less than 2% of rows from the source table
data_filtered = FILTER data_all BY event_name == 'CLIENT_LOGIN'; 

-- Define out columns and save the results to S3
data_out = FOREACH data_filtered GENERATE app_id, payload, event_timestamp;
STORE data_out INTO 's3://epic/hive/dmtolpeko.db/mcp_client_login/';   

From the log you can see that the job used 231 mappers:

INFO mapred.FileInputFormat: Total input paths to process : 250
INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil  - Total input paths (combined) to process : 231
INFO util.MapRedUtil: Total input paths (combined) to process : 231
INFO mapreduce.JobSubmitter: number of splits:231

And we can see that the output contains 231 data files (_SUCCESS is an empty file):

aws s3 ls s3://epic/hive/dmtolpeko.db/mcp_client_login/ --summarize

2018-03-01 22:22:33          0 _SUCCESS
2018-03-01 22:22:30      11165 part-m-00000.gz
2018-03-01 22:22:29      11107 part-m-00001.gz
2018-03-01 22:22:30      11346 part-m-00002.gz
...
2018-03-01 22:22:27       5697 part-m-00228.gz
2018-03-01 22:22:26       5686 part-m-00229.gz
2018-03-01 22:22:28       5480 part-m-00230.gz

Total Objects: 232
   Total Size: 1396533

This job produced a large number of very small files. What can we do to reduce the number of output files?
Continue reading

Uberized Tasks – Make MapReduce More Interactive

Compared with batch processing, interactive processing assumes that you get response to your queries within a few seconds or at least a few dozens of seconds. It is well-known that MapReduce is good for processing large volumes of data that takes dozens of minutes or even several hours to complete, but it is not so good for interactive queries.

Even a simple MapReduce job that processes a small amount of data, and even does not have a reducer may take dozens of seconds in a busy production environment. The main reason of slow performance is the significant overhead related to launching tasks in YARN:

  • For each new MapReduce job YARN allocates an ApplicationMaster container on one of the cluster nodes
  • ApplicationMaster requests YARN ResourceManager to allocate one or more containers to run MapReduce job

Let’s run a simple query that retrieves data from a single row, single column table and uses only 1 mapper (0 reducers):

SELECT 'A' FROM dual WHERE 1=1;

You can see that the cluster is quite busy, so each container allocation takes time:

uber

Excerpt from logs:

00:45:43,041 - Parsing command: select 'A' from dual where 1=1
00:45:44,184 - Starting command: select 'A' from dual where 1=1
00:45:45,232 - Connecting to ResourceManager (by client)
00:45:48,459 - Submitted application
00:45:52,148 - Created MRAppMaster
00:45:55,742 - Connecting to ResourceManager (by AM)
00:45:58,184 - ContainerLauncher - CONTAINER_REMOTE_LAUNCH
00:45:58,246 - Transitioned from ASSIGNED to RUNNING
00:46:01,195 - JVM given task
00:46:04,181 - Progress of TaskAttempt is : 0.0
00:46:04,595 - Progress of TaskAttempt is : 1.0
00:46:04,677 - Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.85 sec
00:46:06,820 - Ended Job
Time taken: 23.8 seconds, Fetched: 1 row(s)

You can see that the query took 23.8 seconds, and the overhead to run a small MapReduce task in YARN is huge, 75% of time was spent on initialization and resource allocation:

  • 1 second to parse the query
  • 9 seconds to submitting the query and launching ApplicationMaster (00:45:43 – 00:45:52)
  • 6 seconds to initialize and launch the container for Map task (00:45:52 – 00:45:58)
  • 3 seconds to initialize JVM (00:45:58 – 00:46:01)
  • 6 seconds for actual MapReduce and cleanup (00:46:01 – 00:46:07)

Uberized Tasks

As you can see the allocation of containers to perform MapReduce takes significant time, so for small tasks that deal with small volumes of data and apply only simple filters you can consider using Uberized tasks.

An Uber task means that the ApplicationMaster uses its own JVM to run Map and Reduce tasks, so the tasks are executed sequentially on one node. In this case YARN has to allocate a container for ApplicationMaster only.

To enable Uber tasks you have to set the following option in Hive:

set mapreduce.job.ubertask.enable=true;

But it maybe not enough. It is also required that your MapReduce task requires less memory that defined by yarn.app.mapreduce.am.resource.mb option in YARN configuration. This option defines how much memory MR ApplicationMaster needs and has the default value of 1536 MB.

So make sure that mapreduce.map.memory.mb and mapreduce.reduce.memory.mb (even if you do not have a reduce step!) have values less than yarn.app.mapreduce.am.resource.mb. If not, use SET statement to change the amount of memory for Map and Reduce tasks in Hive:

set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

For more information how MapReduce defines whether it can run a task in uberized mode, see the method makeUberDecision() of org.apache.hadoop.mapreduce.v2.app.job.imp.JobImpl class:

long sysMemSizeForUberSlot = conf.getInt(MRJobConfig.MR_AM_VMEM_MB, 
                                         MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
...
boolean uberEnabled = conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
...
boolean smallMemory = ((Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0), 
                                 conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
            <= sysMemSizeForUberSlot) || 
     (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
...
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks && 
         smallInput && smallMemory && smallCpu && notChainJob; 

Now if you run the query again it will be executed in Uberized mode:

set mapreduce.job.ubertask.enable=true;
set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

SELECT 'A' FROM dual WHERE 1=1;

This task requires allocation of ApplicationMaster container only that can save you some time running queries.

Invoking Stateful UDF at Map and Reduce Side in Hive

Typically a scalar UDF processes one or more columns from the current row only. For example, UPPER(col) function does not need to store the result of the previous call or access previous rows.

How to write and correctly call a function that needs to store its state? For example, let’s write a ROWNUM function that assigns sequential numbers for all rows in the table, no matter how many mappers were launched to read the data.

The simplest possible UDF:

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;

@UDFType(deterministic = false)
public class Rownum2 extends UDF {
  private Long rowNum = 0L;

  public Long evaluate() {
    rowNum++;
    return rowNum;
  }
}

Note that it is important to specify @UDFType(deterministic = false) otherwise the function called only once. Compile the program and create jar:

javac -classpath .:/usr/lib/hive/lib/hive-exec.jar Rownum2.java
jar cf Rownum2.jar Rownum2.class

Now launch Hive and create the function:

ADD JAR /home/v-dtolpeko/Rownum2.jar;
CREATE TEMPORARY FUNCTION rownum2 AS 'Rownum2';

The ROWNUM2 function is available until the end of the current session.

Map-Side UDF Execution

Let’s now run the following query on a sample table having over 30M rows:

CREATE TABLE channel2 AS
  SELECT name, code, rownum2() AS rn
  FROM channel;

It is important to notice how many mappers were launched for the query:

Hadoop job information for Stage-1: number of mappers: 7; number of reducers: 0

Now you can define how many rows are in the table, what is the maximum and minimum values in RN column:

SELECT COUNT(*), MIN(rn), MAX(rn) FROM channel;
32093903  1        6873300

You can see that the assigned maximum sequential number is not equal to the total number of rows in the table. Investigating further:

SELECT rn, COUNT(*) FROM channel GROUP BY rn LIMIT 10;
1       7
2       7
3       7
4       7
5       7
6       7
7       7
8       7
9       7
10      7

You can see that the number of times the same row number was assigned is equal to the number of mappers i.e ROWNUM function was executed at the map side. EXPLAIN statement confirms this:

EXPLAIN
  SELECT name, code, rownum2() AS rn
  FROM channel;
Stage: Stage-1
  Map Reduce
    Map Operator Tree:
      TableScan
        alias: channel
        Select Operator
          expressions: name (type: string), code (type: string), rownum2() (type: bigint)

Reduce-Side UDF Execution

So how to assign unique row numbers for the entire table? Let’s see how the following query works:

CREATE TABLE channel2 AS
  SELECT name, code, rownum2() AS rn
  FROM
    (SELECT name, code
     FROM channel
     DISTRIBUTE BY 1
     SORT BY 1
    ) t;

I used constant 1 in DISTRIBUTE BY and SORT BY since there are no partitions in the table, and I want to assign unique row numbers for the entire set.

The query runs much longer as all rows are sent to the single reducer now. Now collect some statistics:

SELECT COUNT(*), MIN(rn), MAX(rn) FROM channel;
32093903        1       32093903

Now the assigned maximum sequential number is equal to the total number of rows in the table. EXPLAIN statement output is as follows:

EXPLAIN
  SELECT name, code, rownum2() AS rn
  FROM
    (SELECT name, code
     FROM channel
     DISTRIBUTE BY 1
     SORT BY 1
    ) t;
Stage: Stage-1
  Map Reduce
    Map Operator Tree:
      TableScan
        alias: channel
          Select Operator
            expressions: name (type: string), code (type: string), 
              Reduce Output Operator
                key expressions: 1 (type: int)
                sort order: +
                Map-reduce partition columns: 1 (type: int)
    Reduce Operator Tree:
      Extract
        Select Operator
          expressions: _col0 (type: string), _col1 (type: string), rownum2() (type: bigint)

ROWNUM function is now called at the Reduce side that ensures row numbers are unique within the entire table (since we have only one reducer for the query). It is important to note that Hive still uses a single MapReduce job and just moved the function call to Reduce side, it did not run one MapReduce for subquery and another for the outer query.