Uberized Tasks – Make MapReduce More Interactive

Compared with batch processing, interactive processing assumes that you get response to your queries within a few seconds or at least a few dozens of seconds. It is well-known that MapReduce is good for processing large volumes of data that takes dozens of minutes or even several hours to complete, but it is not so good for interactive queries.

Even a simple MapReduce job that processes a small amount of data, and even does not have a reducer may take dozens of seconds in a busy production environment. The main reason of slow performance is the significant overhead related to launching tasks in YARN:

  • For each new MapReduce job YARN allocates an ApplicationMaster container on one of the cluster nodes
  • ApplicationMaster requests YARN ResourceManager to allocate one or more containers to run MapReduce job

Let’s run a simple query that retrieves data from a single row, single column table and uses only 1 mapper (0 reducers):

SELECT 'A' FROM dual WHERE 1=1;

You can see that the cluster is quite busy, so each container allocation takes time:

uber

Excerpt from logs:

00:45:43,041 - Parsing command: select 'A' from dual where 1=1
00:45:44,184 - Starting command: select 'A' from dual where 1=1
00:45:45,232 - Connecting to ResourceManager (by client)
00:45:48,459 - Submitted application
00:45:52,148 - Created MRAppMaster
00:45:55,742 - Connecting to ResourceManager (by AM)
00:45:58,184 - ContainerLauncher - CONTAINER_REMOTE_LAUNCH
00:45:58,246 - Transitioned from ASSIGNED to RUNNING
00:46:01,195 - JVM given task
00:46:04,181 - Progress of TaskAttempt is : 0.0
00:46:04,595 - Progress of TaskAttempt is : 1.0
00:46:04,677 - Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.85 sec
00:46:06,820 - Ended Job
Time taken: 23.8 seconds, Fetched: 1 row(s)

You can see that the query took 23.8 seconds, and the overhead to run a small MapReduce task in YARN is huge, 75% of time was spent on initialization and resource allocation:

  • 1 second to parse the query
  • 9 seconds to submitting the query and launching ApplicationMaster (00:45:43 – 00:45:52)
  • 6 seconds to initialize and launch the container for Map task (00:45:52 – 00:45:58)
  • 3 seconds to initialize JVM (00:45:58 – 00:46:01)
  • 6 seconds for actual MapReduce and cleanup (00:46:01 – 00:46:07)

Uberized Tasks

As you can see the allocation of containers to perform MapReduce takes significant time, so for small tasks that deal with small volumes of data and apply only simple filters you can consider using Uberized tasks.

An Uber task means that the ApplicationMaster uses its own JVM to run Map and Reduce tasks, so the tasks are executed sequentially on one node. In this case YARN has to allocate a container for ApplicationMaster only.

To enable Uber tasks you have to set the following option in Hive:

set mapreduce.job.ubertask.enable=true;

But it maybe not enough. It is also required that your MapReduce task requires less memory that defined by yarn.app.mapreduce.am.resource.mb option in YARN configuration. This option defines how much memory MR ApplicationMaster needs and has the default value of 1536 MB.

So make sure that mapreduce.map.memory.mb and mapreduce.reduce.memory.mb (even if you do not have a reduce step!) have values less than yarn.app.mapreduce.am.resource.mb. If not, use SET statement to change the amount of memory for Map and Reduce tasks in Hive:

set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

For more information how MapReduce defines whether it can run a task in uberized mode, see the method makeUberDecision() of org.apache.hadoop.mapreduce.v2.app.job.imp.JobImpl class:

long sysMemSizeForUberSlot = conf.getInt(MRJobConfig.MR_AM_VMEM_MB, 
                                         MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
...
boolean uberEnabled = conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
...
boolean smallMemory = ((Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0), 
                                 conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
            <= sysMemSizeForUberSlot) || 
     (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
...
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks && 
         smallInput && smallMemory && smallCpu && notChainJob; 

Now if you run the query again it will be executed in Uberized mode:

set mapreduce.job.ubertask.enable=true;
set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

SELECT 'A' FROM dual WHERE 1=1;

This task requires allocation of ApplicationMaster container only that can save you some time running queries.

Invoking Stateful UDF at Map and Reduce Side in Hive

Typically a scalar UDF processes one or more columns from the current row only. For example, UPPER(col) function does not need to store the result of the previous call or access previous rows.

How to write and correctly call a function that needs to store its state? For example, let’s write a ROWNUM function that assigns sequential numbers for all rows in the table, no matter how many mappers were launched to read the data.

The simplest possible UDF:

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;

@UDFType(deterministic = false)
public class Rownum2 extends UDF {
  private Long rowNum = 0L;

  public Long evaluate() {
    rowNum++;
    return rowNum;
  }
}

Note that it is important to specify @UDFType(deterministic = false) otherwise the function called only once. Compile the program and create jar:

javac -classpath .:/usr/lib/hive/lib/hive-exec.jar Rownum2.java
jar cf Rownum2.jar Rownum2.class

Now launch Hive and create the function:

ADD JAR /home/v-dtolpeko/Rownum2.jar;
CREATE TEMPORARY FUNCTION rownum2 AS 'Rownum2';

The ROWNUM2 function is available until the end of the current session.

Map-Side UDF Execution

Let’s now run the following query on a sample table having over 30M rows:

CREATE TABLE channel2 AS
  SELECT name, code, rownum2() AS rn
  FROM channel;

It is important to notice how many mappers were launched for the query:

Hadoop job information for Stage-1: number of mappers: 7; number of reducers: 0

Now you can define how many rows are in the table, what is the maximum and minimum values in RN column:

SELECT COUNT(*), MIN(rn), MAX(rn) FROM channel;
32093903  1        6873300

You can see that the assigned maximum sequential number is not equal to the total number of rows in the table. Investigating further:

SELECT rn, COUNT(*) FROM channel GROUP BY rn LIMIT 10;
1       7
2       7
3       7
4       7
5       7
6       7
7       7
8       7
9       7
10      7

You can see that the number of times the same row number was assigned is equal to the number of mappers i.e ROWNUM function was executed at the map side. EXPLAIN statement confirms this:

EXPLAIN
  SELECT name, code, rownum2() AS rn
  FROM channel;
Stage: Stage-1
  Map Reduce
    Map Operator Tree:
      TableScan
        alias: channel
        Select Operator
          expressions: name (type: string), code (type: string), rownum2() (type: bigint)

Reduce-Side UDF Execution

So how to assign unique row numbers for the entire table? Let’s see how the following query works:

CREATE TABLE channel2 AS
  SELECT name, code, rownum2() AS rn
  FROM
    (SELECT name, code
     FROM channel
     DISTRIBUTE BY 1
     SORT BY 1
    ) t;

I used constant 1 in DISTRIBUTE BY and SORT BY since there are no partitions in the table, and I want to assign unique row numbers for the entire set.

The query runs much longer as all rows are sent to the single reducer now. Now collect some statistics:

SELECT COUNT(*), MIN(rn), MAX(rn) FROM channel;
32093903        1       32093903

Now the assigned maximum sequential number is equal to the total number of rows in the table. EXPLAIN statement output is as follows:

EXPLAIN
  SELECT name, code, rownum2() AS rn
  FROM
    (SELECT name, code
     FROM channel
     DISTRIBUTE BY 1
     SORT BY 1
    ) t;
Stage: Stage-1
  Map Reduce
    Map Operator Tree:
      TableScan
        alias: channel
          Select Operator
            expressions: name (type: string), code (type: string), 
              Reduce Output Operator
                key expressions: 1 (type: int)
                sort order: +
                Map-reduce partition columns: 1 (type: int)
    Reduce Operator Tree:
      Extract
        Select Operator
          expressions: _col0 (type: string), _col1 (type: string), rownum2() (type: bigint)

ROWNUM function is now called at the Reduce side that ensures row numbers are unique within the entire table (since we have only one reducer for the query). It is important to note that Hive still uses a single MapReduce job and just moved the function call to Reduce side, it did not run one MapReduce for subquery and another for the outer query.

Map-side Aggregation in Hive

In MapReduce, you usually use map tasks to filter out records and reduce tasks to perform aggregations. But when you need to perform an aggregation of a large number of rows then using reducers can lead to a very high network I/O. Fortunately, Hive is capable of doing map-side aggregations whenever possible.

Consider a query to calculate the total number of orders per store and product:

SELECT store, product, count(*) AS cnt 
FROM orders
GROUP BY store, product;

Without map-side aggregation, all rows have to be sent to reducers causing high network I/O. Hive has hive.map.aggr option that is set to true, by default, to specify whether to use map-side aggregation in GROUP BY.

Here is some statistics of the query execution for a sample orders table:

mapagg_20141013

The query does not contain a WHERE clause, and you can see that while the input number of rows is 1,075,477,972 (Map input records), the number of map output rows is just 7,370,737. That’s quite a good result as map tasks were able to reduce the number of rows from 1 billion to 7.3 million.

Only 7,370,737 rows have to be sent to reducers that finally reduced them to 1,657,744 rows.

To perform map-side aggregation, Hive uses a in-memory hash table to hold aggregate values. Besides hive.map.aggr, Hive offers the following options to configure map-side aggregation:

  • hive.map.aggr.hash.force.flush.memory.threshold (Default: 0.9) – When the size of the hash table exceeds threshold, it is flushed. This means that for some keys a map task can produce multiple local counts.
  • hive.map.aggr.hash.percentmemory (Default: 0.5) – Percent of total map task memory that can be used for hash table.
  • hive.map.aggr.hash.min.reduction (Default: 0.5) – Ratio between hash table and input size to turn map-side aggregation off. If input keys are unique (or highly selective) within each map task there is no value in map-side aggregation.
  • hive.groupby.mapaggr.checkinterval (Default: 100,000) – After this number of rows Hive checks the number of items in the hash table.