In a data warehouse, you often need to join several large tables. Let’s assume you have ORDERS and ORDER_ITEMS tables defined as follows:
CREATE TABLE orders ( id INT, created DATE, customer VARCHAR(100) ); CREATE TABLE order_items ( id INT, order_id INT, product_id INT, qty INT, price DECIMAL(9,2) );
You can use the following query to get all order items:
SELECT * FROM order o, order_items i WHERE o.id = i.order_id
Both tables can be very large, containing hundreds of millions rows each.
Join in Traditional MPP Systems
First, let’s see how this problem is solved in traditional MPP systems such as Teradata, DB2 DPF, Greenplum and others.
MPP (Massively Parallel Processing) systems store data on multiple nodes using shared-nothing architecture.
You can define a hash distribution on ID column for ORDERS table, and ORDER_ID column for ORDER_ITEMS table. This guarantees that for any given order all its order item rows are located on the same node.
So when the join is performed, each node can join own portion of data, it does not need to communicate and fetch data from other nodes for the join operation.
From the sample above you can see that all rows for orders 1, 3 and 5 are located on Node 1 including its order items, and Node 2 stores all rows for orders 2, 4 and 6.
MPP systems offer great performance for such kind of joins since there is no network IO overhead.
Reduce-Side Join in Hadoop
Hadoop is also a distributed processing system but it is based on different concepts compared with Teradata, DB2 and other data warehousing systems.
Data is stored in HDFS, a distributed file system where a data file is split to multiple blocks (typically, 126 or 256 MB each) and these blocks are distributed among all nodes in the cluster:
Since data is just evenly split to blocks, there is no any guarantee that rows for a given order and its order items are located on the same node.
To join 2 large tables all rows are shuffled i.e. moved from all data nodes to reducer nodes where the actual join is performed. This causes significant network IO and processing overhead and as a result significantly reduces join performance.
Bucket Map-Side Join in Hadoop
Hadoop Hive allows you to bucket data in tables by values of the specified columns. It works similar to the hashing mechanism. You define the columns and the number of buckets i.e. roughly the number of nodes to store data.
-- Hive DDL CREATE TABLE orders ( id INT, created DATE, customer STRING ) CLUSTERED BY (id) INTO 3 BUCKETS; CREATE TABLE order_items ( id INT, order_id INT, product_id INT, qty INT, price DECIMAL ) CLUSTERED BY (order_id) INTO 3 BUCKETS;
Now when you load data into ORDERS (and ORDER_ITEMS) table, data is loaded into 3 buckets. Physically it means that Hive will create 3 files in HDFS:
Now you can see that having all hashed order ID numbers into the same buckets for both ORDERS and ORDER_ITEM tables, it is possible to perform Map-Side join in Hive.
In this case Hadoop does not need to shuffle all data now. Each mapper can take the corresponding bucket files for 2 tables and perform their join by order ID. For all order ID values that produce the same hash code (0, 1 or 2 in our example), all their order items are located in the bucket with the same hash value.
Is this the same as in MPP systems?
Although data is split into buckets now and map-side join can be performed without full data shuffle, corresponding bucket files for 2 tables can be located on 2 different data nodes, and if the map function can run locally where the bucket for the first table is located, it must fetch the bucket file for the second table from a different node.
In traditional MPP systems, data locality is ensured, a node does not need to fetch data from other nodes to join tables by the same distribution key.
No matter which processing framework is used – MapReduce, Tez, Spark etc. – only data storage co-location can ensure high performance join of large related data sets.
To better match the capabilities of traditional MPP data warehouses, Hadoop HDFS may introduce some improvements:
- Since MPP systems offer row-level data distribution between nodes, and HDFS uses block-level distribution, HDFS may be extended to allow assigning a hash value to each new data block.
- When a hash value is assigned the storage location for this block is predefined. All blocks with the same hash value are stored on the same data node.
- Hadoop Hive calculates a hash value for the bucket and uses it as HDFS hash value, so all buckets with the same hash value are stored on the same data node.
- HDFS hashes are also taken into account when replicating, so all buckets are replicated to the same nodes.
- In case of node failures, hashes can be ignored that just leads to degraded performance, not the system stability.
This improvement will ensure data locality for map-side joins on bucketed tables and as a result increased performance on join operations.