Uberized Tasks – Make MapReduce More Interactive

Compared with batch processing, interactive processing assumes that you get response to your queries within a few seconds or at least a few dozens of seconds. It is well-known that MapReduce is good for processing large volumes of data that takes dozens of minutes or even several hours to complete, but it is not so good for interactive queries.

Even a simple MapReduce job that processes a small amount of data, and even does not have a reducer may take dozens of seconds in a busy production environment. The main reason of slow performance is the significant overhead related to launching tasks in YARN:

  • For each new MapReduce job YARN allocates an ApplicationMaster container on one of the cluster nodes
  • ApplicationMaster requests YARN ResourceManager to allocate one or more containers to run MapReduce job

Let’s run a simple query that retrieves data from a single row, single column table and uses only 1 mapper (0 reducers):

SELECT 'A' FROM dual WHERE 1=1;

You can see that the cluster is quite busy, so each container allocation takes time:

uber

Excerpt from logs:

00:45:43,041 - Parsing command: select 'A' from dual where 1=1
00:45:44,184 - Starting command: select 'A' from dual where 1=1
00:45:45,232 - Connecting to ResourceManager (by client)
00:45:48,459 - Submitted application
00:45:52,148 - Created MRAppMaster
00:45:55,742 - Connecting to ResourceManager (by AM)
00:45:58,184 - ContainerLauncher - CONTAINER_REMOTE_LAUNCH
00:45:58,246 - Transitioned from ASSIGNED to RUNNING
00:46:01,195 - JVM given task
00:46:04,181 - Progress of TaskAttempt is : 0.0
00:46:04,595 - Progress of TaskAttempt is : 1.0
00:46:04,677 - Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.85 sec
00:46:06,820 - Ended Job
Time taken: 23.8 seconds, Fetched: 1 row(s)

You can see that the query took 23.8 seconds, and the overhead to run a small MapReduce task in YARN is huge, 75% of time was spent on initialization and resource allocation:

  • 1 second to parse the query
  • 9 seconds to submitting the query and launching ApplicationMaster (00:45:43 – 00:45:52)
  • 6 seconds to initialize and launch the container for Map task (00:45:52 – 00:45:58)
  • 3 seconds to initialize JVM (00:45:58 – 00:46:01)
  • 6 seconds for actual MapReduce and cleanup (00:46:01 – 00:46:07)

Uberized Tasks

As you can see the allocation of containers to perform MapReduce takes significant time, so for small tasks that deal with small volumes of data and apply only simple filters you can consider using Uberized tasks.

An Uber task means that the ApplicationMaster uses its own JVM to run Map and Reduce tasks, so the tasks are executed sequentially on one node. In this case YARN has to allocate a container for ApplicationMaster only.

To enable Uber tasks you have to set the following option in Hive:

set mapreduce.job.ubertask.enable=true;

But it maybe not enough. It is also required that your MapReduce task requires less memory that defined by yarn.app.mapreduce.am.resource.mb option in YARN configuration. This option defines how much memory MR ApplicationMaster needs and has the default value of 1536 MB.

So make sure that mapreduce.map.memory.mb and mapreduce.reduce.memory.mb (even if you do not have a reduce step!) have values less than yarn.app.mapreduce.am.resource.mb. If not, use SET statement to change the amount of memory for Map and Reduce tasks in Hive:

set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

For more information how MapReduce defines whether it can run a task in uberized mode, see the method makeUberDecision() of org.apache.hadoop.mapreduce.v2.app.job.imp.JobImpl class:

long sysMemSizeForUberSlot = conf.getInt(MRJobConfig.MR_AM_VMEM_MB, 
                                         MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
...
boolean uberEnabled = conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
...
boolean smallMemory = ((Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0), 
                                 conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
            <= sysMemSizeForUberSlot) || 
     (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
...
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks && 
         smallInput && smallMemory && smallCpu && notChainJob; 

Now if you run the query again it will be executed in Uberized mode:

set mapreduce.job.ubertask.enable=true;
set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

SELECT 'A' FROM dual WHERE 1=1;

This task requires allocation of ApplicationMaster container only that can save you some time running queries.

Leave a Reply