Hive on MapReduce Can Launch Multiple ApplicationMasters per Single Query

Hive on MapReduce converts a HQL statement to one or more MapReduce jobs. For example, if you have 2 subqueries joined together, Hive launches multiple jobs to execute this query:

SELECT * FROM
(
  SELECT DISTINCT SOURCE_NAME
  FROM LZ_OUT
  WHERE QDATE = '1900-01-01'
) w LEFT OUTER JOIN
(
  SELECT DISTINCT SOURCE_NAME
  FROM LZ_OUT
  WHERE QDATE != '1900-01-01'
) d ON w.SOURCE_NAME = d.SOURCE_NAME; 

Excerpt from Hive log:

Launching Job 1 out of 4
Submitted application application_1412375486094_61806
The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61806/
Stage-1: number of mappers: 67; number of reducers: 1
...
Launching Job 2 out of 4
Submitted application application_1412375486094_61842
The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61842/
...
Launching Job 4 out of 4
Submitted application application_1412375486094_61863
The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61863/

For each job Hive asks YARN ResourceManager to launch a separate ApplicationMaster. Each ApplicationMaster needs to allocate new containers for its job from scratch. This approach adds overhead to query execution in Hive.

But for long running batch queries this approach ensures that if one of the jobs fails, it can be safely restarted and there is no need to start the query execution from the beginning.

Implementation Limitations of MapJoin in Hive 0.13 on MR

When you need to join a large table (fact) with a small table (dimension) Hive can perform a map side join. You may assume that multiple map tasks is started to read the large table and each mapper will read its own full copy of the small table and perform the join locally.

In Hive 0.13 on MapReduce engine it is implemented in a slightly different and unfortunately not always optimal way:

Step 1 – Download Side-table to the Hive Client machine

First, the data file of the side table is downloaded to the local disk of the Hive client machine which typically is not a Data Node.

You can see this from log:

Starting to launch local task to process map join;
Dump the side-table into file: file:/tmp/v-dtolpeko/hive_2014-10-01 ...
...
End of local task; Time Taken: 2.013 sec.
Execution completed successfully
MapredLocal task succeeded

Step 2 – Create HashTable and archive the file

Then Hive transforms the side-table data to HashTable and creates .gz archive file:

Archive 1 hash table files to file:/tmp/v-dtolpeko/hive_2014-10-01.../Stage-4.tar.gz

Step 3 – Upload HashTable to HDFS

Hive uploads the archive .gz file to HDFS and add it to the Distributed Cache.

Upload 1 archive file  from file:/tmp/v-dtolpeko/hive_2014-10-01.../Stage-4.tar.gz to 
hdfs://chsxe...
Add 1 archive file to distributed cache. Archive file: hdfs://chsxe... 

mapjoin

As you can see the side-table is downloaded from HDFS, transformed into Hash Table and archived, and finally written back to HDFS.

You should take into account this behavior when optimizing Hive queries using MapJoin.

Hadoop vs MPP – Joining 2 Large Tables – Optimization Using Bucket Map Join

In a data warehouse, you often need to join several large tables. Let’s assume you have ORDERS and ORDER_ITEMS tables defined as follows:

  CREATE TABLE orders
  (
    id INT,
    created DATE,
    customer VARCHAR(100)
  );

  CREATE TABLE order_items
  (
    id INT,
    order_id INT,
    product_id INT,
    qty INT,
    price DECIMAL(9,2)
  );

You can use the following query to get all order items:

  SELECT *
  FROM order o, order_items i
  WHERE o.id = i.order_id

Both tables can be very large, containing hundreds of millions rows each.

Join in Traditional MPP Systems

First, let’s see how this problem is solved in traditional MPP systems such as Teradata, DB2 DPF, Greenplum and others.

Continue reading