Loading Files to Dynamic Partitions in Hive

Fact tables usually have a partition column that specifies the date (or hour) when the data were loaded. For example, the sales fact table can be defined as follows:

CREATE TABLE sales
(
  location_id INT, 
  product_id INT, 
  quantity INT, 
  price DECIMAL(10,2), 
  amount DECIMAL(10,2),
  trans_dtm TIMESTAMP
) 
  PARTITIONED BY (load_dt STRING);

Normally, when data for fact tables arrive daily (or hourly), you can just create a new named partition and load data:

INSERT OVERWRITE TABLE sales PARTITION (load_dt='20151111')
  SELECT ... FROM sales_stg

But in a real world you have to design your ETL process to handle outages, to automatically reprocess data for the previous periods on so on. So you have to design your process to handle multiple files in your staging area, and load data to multiple target partitions.

Assume your HDFS staging area contains sales data for 2 days:

multfiles

On top of this directory you can create an external table in Hive:

CREATE EXTERNAL TABLE sales_stg
(
  location_id STRING, 
  product_id STRING, 
  quantity STRING, 
  price STRING, 
  amount STRING,
  trans_dtm  STRING
) 
  ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  STORED AS TEXTFILE
  LOCATION '/in/sales';

This external table points to the directory that currently contains sales data files for 2 days. You can use the following approach to load data into 2 partitions dynamically:

INSERT OVERWRITE TABLE sales PARTITION (load_dt)
  SELECT
    location_id, 
    product_id, 
    quantity, 
    price, 
    amount,
    trans_dtm,
    regexp_extract(INPUT__FILE__NAME,'.*sales_(.*)\.txt', 1) AS load_dt
  FROM sales_stg

INPUT__FILE__NAME is a virtual column that specifies in which file the current row is located. The partition value is extracted from the file name, so data loaded into multiple partitions using the single SQL INSERT statement.

Reduce = 99% or Skewed Joins in Hive

Often running a HQL query you may notice that it progresses to 99% reduce stage quite fast and then stucks:

...
2014-10-07 08:46:01,149 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 6905.85 sec
2014-10-07 08:47:01,361 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 6999.45 sec
2014-10-07 08:48:01,441 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 7065.59 sec
2014-10-07 08:49:01,670 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 7125.26 sec
2014-10-07 08:50:01,808 Stage-8 map = 100%,  reduce = 99%, Cumulative CPU 7188.12 sec

The problem is that Hive estimates the progress depending on the number of reducers completed, and this does not always relevant to the actual execution progress. It is possible that a query can reach 99% in 1 minute and then execute remaining 1% during 1 hour.

The most typical reason of this behavior is skewed data. For example, assume that you have a table that tracks all visits to the specific sites and SITE.COM has 100M rows while there are a dozen of other sites SITE.ORG, SITE.NET etc. that have just 10K visitors each.

Then when you join this table with another by site name, one reducer has to process 100M rows while other reducers process just 10K rows each.

So if you have 99 sites having 10K visitors, single site having 100M visitors and specify 100 reducers then 99% of reducers will finish their work very quickly and you have to wait for a long time when the last reducer terminates.

Not only joins

Data skew issue can arise not only in joins. For example, if you perform a GROUP BY SITE_NAME in our example then a single reducer has to deal with 100M rows while others have to process much smaller number of rows.