Uberized Tasks – Make MapReduce More Interactive

Compared with batch processing, interactive processing assumes that you get response to your queries within a few seconds or at least a few dozens of seconds. It is well-known that MapReduce is good for processing large volumes of data that takes dozens of minutes or even several hours to complete, but it is not so good for interactive queries.

Even a simple MapReduce job that processes a small amount of data, and even does not have a reducer may take dozens of seconds in a busy production environment. The main reason of slow performance is the significant overhead related to launching tasks in YARN:

  • For each new MapReduce job YARN allocates an ApplicationMaster container on one of the cluster nodes
  • ApplicationMaster requests YARN ResourceManager to allocate one or more containers to run MapReduce job

Let’s run a simple query that retrieves data from a single row, single column table and uses only 1 mapper (0 reducers):

SELECT 'A' FROM dual WHERE 1=1;

You can see that the cluster is quite busy, so each container allocation takes time:

uber

Excerpt from logs:

00:45:43,041 - Parsing command: select 'A' from dual where 1=1
00:45:44,184 - Starting command: select 'A' from dual where 1=1
00:45:45,232 - Connecting to ResourceManager (by client)
00:45:48,459 - Submitted application
00:45:52,148 - Created MRAppMaster
00:45:55,742 - Connecting to ResourceManager (by AM)
00:45:58,184 - ContainerLauncher - CONTAINER_REMOTE_LAUNCH
00:45:58,246 - Transitioned from ASSIGNED to RUNNING
00:46:01,195 - JVM given task
00:46:04,181 - Progress of TaskAttempt is : 0.0
00:46:04,595 - Progress of TaskAttempt is : 1.0
00:46:04,677 - Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.85 sec
00:46:06,820 - Ended Job
Time taken: 23.8 seconds, Fetched: 1 row(s)

You can see that the query took 23.8 seconds, and the overhead to run a small MapReduce task in YARN is huge, 75% of time was spent on initialization and resource allocation:

  • 1 second to parse the query
  • 9 seconds to submitting the query and launching ApplicationMaster (00:45:43 – 00:45:52)
  • 6 seconds to initialize and launch the container for Map task (00:45:52 – 00:45:58)
  • 3 seconds to initialize JVM (00:45:58 – 00:46:01)
  • 6 seconds for actual MapReduce and cleanup (00:46:01 – 00:46:07)

Uberized Tasks

As you can see the allocation of containers to perform MapReduce takes significant time, so for small tasks that deal with small volumes of data and apply only simple filters you can consider using Uberized tasks.

An Uber task means that the ApplicationMaster uses its own JVM to run Map and Reduce tasks, so the tasks are executed sequentially on one node. In this case YARN has to allocate a container for ApplicationMaster only.

To enable Uber tasks you have to set the following option in Hive:

set mapreduce.job.ubertask.enable=true;

But it maybe not enough. It is also required that your MapReduce task requires less memory that defined by yarn.app.mapreduce.am.resource.mb option in YARN configuration. This option defines how much memory MR ApplicationMaster needs and has the default value of 1536 MB.

So make sure that mapreduce.map.memory.mb and mapreduce.reduce.memory.mb (even if you do not have a reduce step!) have values less than yarn.app.mapreduce.am.resource.mb. If not, use SET statement to change the amount of memory for Map and Reduce tasks in Hive:

set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

For more information how MapReduce defines whether it can run a task in uberized mode, see the method makeUberDecision() of org.apache.hadoop.mapreduce.v2.app.job.imp.JobImpl class:

long sysMemSizeForUberSlot = conf.getInt(MRJobConfig.MR_AM_VMEM_MB, 
                                         MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
...
boolean uberEnabled = conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
...
boolean smallMemory = ((Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0), 
                                 conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
            <= sysMemSizeForUberSlot) || 
     (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
...
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks && 
         smallInput && smallMemory && smallCpu && notChainJob; 

Now if you run the query again it will be executed in Uberized mode:

set mapreduce.job.ubertask.enable=true;
set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

SELECT 'A' FROM dual WHERE 1=1;

This task requires allocation of ApplicationMaster container only that can save you some time running queries.

Capacity Scheduler Preemption Tuning

In a busy environment your MapReduce tasks maybe often killed to release the cluster resources to run high priority applications.

By default, when the preemption is enabled (yarn.resourcemanager.scheduler.monitor.enable is set to true in yarn-site.xml) Capacity Scheduler monitors resources every 3 seconds and kills selected containers if they do not gracefully terminate within 15 seconds after receiving a terminate request.

capacity_20141010

These default settings maybe too aggressive, and you can change them to allow MapReduce tasks to run longer before preemption. From the above example, you can see that the Map task was killed 3 times and 2 times it was killed just after about 30 seconds of execution.

You can edit yarn-site.xml and set yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill to a higher value (time in milliseconds, the default is 15000) to allow your MapReduce tasks to complete their work:

<property>
<name>yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill</name>
<value>300000</value>
</property>

Hive on Tez – 2 Seconds (The Fastest Query, October 2014)

I am trying to estimate what the fastest performance you can achieve in a live environment using Hive on Tez. What is the overhead of launching tasks on Tez?

Environment (Live production cluster):
cluster_20141009

Let’s query a single row/column table dual. Hive settings:

set hive.execution.engine=tez;
set hive.prewarm.enabled=true;

Hive on Tez does not automatically allocate a session and containers, you have to launch any query to warm up Tez. For this reason I did not take into account the first execution of the query. After the first attempt, the best attempt is as follows:

select 1 from dual where 1 != 0;
Query ID = v-dtolpeko_20141009085353_1303a726-cddb-421c-bdc7-d47db1678fa4
Total jobs = 1
Launching Job 1 out of 1

Status: Running (application id: application_1412375486094_125195)

Map 1: -/-
Map 1: 0/1
Map 1: 1/1
Status: Finished successfully
OK
1
Time taken: 2.05 seconds, Fetched: 1 row(s)

An attempt in a less busy environment:

cluster_20141009_2

select 1 from dual where 1 != 0;
Query ID = v-dtolpeko_20141009085555_10aa1761-0ccd-422b-b147-b3391b5f512f
Total jobs = 1
Launching Job 1 out of 1

Status: Running (application id: application_1412375486094_125195)

Map 1: -/-
Map 1: 0/1
Map 1: 1/1
Status: Finished successfully
OK
1
Time taken: 1.791 seconds, Fetched: 1 row(s)

Just for comparison, let’s run on MapReduce:

set hive.execution.engine=mr;

select 1 from dual where 1 != 0;
Query ID = v-dtolpeko_20141009090707_43ff36eb-7eb5-46d3-a53a-1c383ce558b1
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1412375486094_125419, Tracking URL = http://chsxedw:8088/proxy/application_1412375486094_125419/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1412375486094_125419
Hadoop job information for Stage-1: number of mappers: 0; number of reducers: 0
2014-10-09 09:07:45,439 Stage-1 map = 0%,  reduce = 0%
2014-10-09 09:08:04,474 Stage-1 map = 100%,  reduce = 0%
Ended Job = job_1412375486094_125419
MapReduce Jobs Launched:
Job 0:  HDFS Read: 0 HDFS Write: 0 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
1
Time taken: 31.837 seconds, Fetched: 1 row(s)

You can see that Tez allows you to reduce the query start up overhead to 2 seconds, but still not to 0.01-0.1 seconds.