Hive on MapReduce converts a HQL statement to one or more MapReduce jobs. For example, if you have 2 subqueries joined together, Hive launches multiple jobs to execute this query:
SELECT * FROM
SELECT DISTINCT SOURCE_NAME
WHERE QDATE = '1900-01-01'
) w LEFT OUTER JOIN
SELECT DISTINCT SOURCE_NAME
WHERE QDATE != '1900-01-01'
) d ON w.SOURCE_NAME = d.SOURCE_NAME;
Excerpt from Hive log:
Launching Job 1 out of 4
Submitted application application_1412375486094_61806
The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61806/
Stage-1: number of mappers: 67; number of reducers: 1
Launching Job 2 out of 4
Submitted application application_1412375486094_61842
The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61842/
Launching Job 4 out of 4
Submitted application application_1412375486094_61863
The url to track the job: http://chsxedw:8088/proxy/application_1412375486094_61863/
For each job Hive asks YARN ResourceManager to launch a separate ApplicationMaster. Each ApplicationMaster needs to allocate new containers for its job from scratch. This approach adds overhead to query execution in Hive.
But for long running batch queries this approach ensures that if one of the jobs fails, it can be safely restarted and there is no need to start the query execution from the beginning.
When you need to join a large table (fact) with a small table (dimension) Hive can perform a map side join. You may assume that multiple map tasks is started to read the large table and each mapper will read its own full copy of the small table and perform the join locally.
In Hive 0.13 on MapReduce engine it is implemented in a slightly different and unfortunately not always optimal way:
Step 1 – Download Side-table to the Hive Client machine
First, the data file of the side table is downloaded to the local disk of the Hive client machine which typically is not a Data Node.
You can see this from log:
Starting to launch local task to process map join;
Dump the side-table into file: file:/tmp/v-dtolpeko/hive_2014-10-01 ...
End of local task; Time Taken: 2.013 sec.
Execution completed successfully
MapredLocal task succeeded
Step 2 – Create HashTable and archive the file
Then Hive transforms the side-table data to HashTable and creates .gz archive file:
Archive 1 hash table files to file:/tmp/v-dtolpeko/hive_2014-10-01.../Stage-4.tar.gz
Step 3 – Upload HashTable to HDFS
Hive uploads the archive .gz file to HDFS and add it to the Distributed Cache.
Upload 1 archive file from file:/tmp/v-dtolpeko/hive_2014-10-01.../Stage-4.tar.gz to
Add 1 archive file to distributed cache. Archive file: hdfs://chsxe...
As you can see the side-table is downloaded from HDFS, transformed into Hash Table and archived, and finally written back to HDFS.
You should take into account this behavior when optimizing Hive queries using MapJoin.
In a data warehouse, you often need to join several large tables. Let’s assume you have ORDERS and ORDER_ITEMS tables defined as follows:
CREATE TABLE orders
CREATE TABLE order_items
You can use the following query to get all order items:
FROM order o, order_items i
WHERE o.id = i.order_id
Both tables can be very large, containing hundreds of millions rows each.
Join in Traditional MPP Systems
First, let’s see how this problem is solved in traditional MPP systems such as Teradata, DB2 DPF, Greenplum and others.