Uberized Tasks – Make MapReduce More Interactive

Compared with batch processing, interactive processing assumes that you get response to your queries within a few seconds or at least a few dozens of seconds. It is well-known that MapReduce is good for processing large volumes of data that takes dozens of minutes or even several hours to complete, but it is not so good for interactive queries.

Even a simple MapReduce job that processes a small amount of data, and even does not have a reducer may take dozens of seconds in a busy production environment. The main reason of slow performance is the significant overhead related to launching tasks in YARN:

  • For each new MapReduce job YARN allocates an ApplicationMaster container on one of the cluster nodes
  • ApplicationMaster requests YARN ResourceManager to allocate one or more containers to run MapReduce job

Let’s run a simple query that retrieves data from a single row, single column table and uses only 1 mapper (0 reducers):

SELECT 'A' FROM dual WHERE 1=1;

You can see that the cluster is quite busy, so each container allocation takes time:

uber

Excerpt from logs:

00:45:43,041 - Parsing command: select 'A' from dual where 1=1
00:45:44,184 - Starting command: select 'A' from dual where 1=1
00:45:45,232 - Connecting to ResourceManager (by client)
00:45:48,459 - Submitted application
00:45:52,148 - Created MRAppMaster
00:45:55,742 - Connecting to ResourceManager (by AM)
00:45:58,184 - ContainerLauncher - CONTAINER_REMOTE_LAUNCH
00:45:58,246 - Transitioned from ASSIGNED to RUNNING
00:46:01,195 - JVM given task
00:46:04,181 - Progress of TaskAttempt is : 0.0
00:46:04,595 - Progress of TaskAttempt is : 1.0
00:46:04,677 - Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.85 sec
00:46:06,820 - Ended Job
Time taken: 23.8 seconds, Fetched: 1 row(s)

You can see that the query took 23.8 seconds, and the overhead to run a small MapReduce task in YARN is huge, 75% of time was spent on initialization and resource allocation:

  • 1 second to parse the query
  • 9 seconds to submitting the query and launching ApplicationMaster (00:45:43 – 00:45:52)
  • 6 seconds to initialize and launch the container for Map task (00:45:52 – 00:45:58)
  • 3 seconds to initialize JVM (00:45:58 – 00:46:01)
  • 6 seconds for actual MapReduce and cleanup (00:46:01 – 00:46:07)

Uberized Tasks

As you can see the allocation of containers to perform MapReduce takes significant time, so for small tasks that deal with small volumes of data and apply only simple filters you can consider using Uberized tasks.

An Uber task means that the ApplicationMaster uses its own JVM to run Map and Reduce tasks, so the tasks are executed sequentially on one node. In this case YARN has to allocate a container for ApplicationMaster only.

To enable Uber tasks you have to set the following option in Hive:

set mapreduce.job.ubertask.enable=true;

But it maybe not enough. It is also required that your MapReduce task requires less memory that defined by yarn.app.mapreduce.am.resource.mb option in YARN configuration. This option defines how much memory MR ApplicationMaster needs and has the default value of 1536 MB.

So make sure that mapreduce.map.memory.mb and mapreduce.reduce.memory.mb (even if you do not have a reduce step!) have values less than yarn.app.mapreduce.am.resource.mb. If not, use SET statement to change the amount of memory for Map and Reduce tasks in Hive:

set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

For more information how MapReduce defines whether it can run a task in uberized mode, see the method makeUberDecision() of org.apache.hadoop.mapreduce.v2.app.job.imp.JobImpl class:

long sysMemSizeForUberSlot = conf.getInt(MRJobConfig.MR_AM_VMEM_MB, 
                                         MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
...
boolean uberEnabled = conf.getBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
...
boolean smallMemory = ((Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0), 
                                 conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0))
            <= sysMemSizeForUberSlot) || 
     (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT));
...
isUber = uberEnabled && smallNumMapTasks && smallNumReduceTasks && 
         smallInput && smallMemory && smallCpu && notChainJob; 

Now if you run the query again it will be executed in Uberized mode:

set mapreduce.job.ubertask.enable=true;
set mapreduce.map.memory.mb=1000;
set mapreduce.reduce.memory.mb=1000;

SELECT 'A' FROM dual WHERE 1=1;

This task requires allocation of ApplicationMaster container only that can save you some time running queries.

Adding Columns to an Existing Table in Hive

Let’s see what happens with existing data if you add new columns and then load new data into a table in Hive.

First we will create a table and load an initial data set as follows:

CREATE TABLE airfact
(
  origin STRING,
  dest STRING
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; 

LOAD DATA LOCAL INPATH 'airfact1.txt' INTO TABLE airfact;

The sample airfact1.txt data file content (TAB-delimited file):

SFO     JFK
ORD     LAX

LOAD DATA command just copies the specified file into the table directory, it does not perform any changes or validations of the file.

Now let’s add 2 new columns to the table and load a file containing data in 4 columns:

ALTER TABLE airfact ADD COLUMNS (flight STRING, time STRING);

LOAD DATA LOCAL INPATH 'airfact2.txt' INTO TABLE airfact;

The sample airfact2.txt data file content:

SFO     JFK     Delta 2240              9:15p
ORD     LAX     Virgin America 241      5:35p

Now if you retrieve the data from airfact table, Hive returns NULL values for flight and time columns in old rows:

SELECT * FROM airfact;

airfact

If you browse the HDFS directory of the table, you can see the two original files that we loaded before:

airfact2

So adding new columns into a table is a relatively cheap metadata-only operation as Hive does not modify the existing data files. Then when you retrieve data from the table Hive sets NULL values for columns that do not exist in old data files.