Loading Files to Dynamic Partitions in Hive

Fact tables usually have a partition column that specifies the date (or hour) when the data were loaded. For example, the sales fact table can be defined as follows:

CREATE TABLE sales
(
  location_id INT, 
  product_id INT, 
  quantity INT, 
  price DECIMAL(10,2), 
  amount DECIMAL(10,2),
  trans_dtm TIMESTAMP
) 
  PARTITIONED BY (load_dt STRING);

Normally, when data for fact tables arrive daily (or hourly), you can just create a new named partition and load data:

INSERT OVERWRITE TABLE sales PARTITION (load_dt='20151111')
  SELECT ... FROM sales_stg

But in a real world you have to design your ETL process to handle outages, to automatically reprocess data for the previous periods on so on. So you have to design your process to handle multiple files in your staging area, and load data to multiple target partitions.

Assume your HDFS staging area contains sales data for 2 days:

multfiles

On top of this directory you can create an external table in Hive:

CREATE EXTERNAL TABLE sales_stg
(
  location_id STRING, 
  product_id STRING, 
  quantity STRING, 
  price STRING, 
  amount STRING,
  trans_dtm  STRING
) 
  ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  STORED AS TEXTFILE
  LOCATION '/in/sales';

This external table points to the directory that currently contains sales data files for 2 days. You can use the following approach to load data into 2 partitions dynamically:

INSERT OVERWRITE TABLE sales PARTITION (load_dt)
  SELECT
    location_id, 
    product_id, 
    quantity, 
    price, 
    amount,
    trans_dtm,
    regexp_extract(INPUT__FILE__NAME,'.*sales_(.*)\.txt', 1) AS load_dt
  FROM sales_stg

INPUT__FILE__NAME is a virtual column that specifies in which file the current row is located. The partition value is extracted from the file name, so data loaded into multiple partitions using the single SQL INSERT statement.

Using Hive UDF to Perform Correlated Subquery

Often a correlated subquery is used in traditional SQL databases to calculate the value of a resulting column using a complex expression that not always possible to achieve using the join operator.

For example,

SELECT val,
  CASE 
    WHEN val BETWEEN 0 AND 7 THEN (SELECT score FROM scores WHERE score = val)
    WHEN val < 0 THEN 1 - (SELECT score FROM scores WHERE score * -1.00 = val)
    ELSE 0
  END AS score  
FROM vals

You can see that the resulting SCORE column is calculated using CASE expression with 2 different subqueries to SCORES table.

Let’s assume that SCORES is a small look-up table, so we can rewrite this query with a Hive UDF as follows:

SELECT val, fn_scores(val) AS score
FROM vals

Our Hive UDF will read data from a text file, build a hash map and return the required value. Its code:

import java.util.HashMap;
import java.io.BufferedReader;
import java.io.FileReader;
import org.apache.hadoop.hive.ql.exec.UDF;

public class Scores extends UDF {
  private HashMap<Double, Double> map;

  // Actual UDF code
  public Double evaluate(Double score) {
    if (score == null) {
      return 0d;
    }
    else if (score >= 0 && score <= 7) { 
      return get(score);     
    } 
    else if (score < 0) {
      return 1 - get(zscore * -1.00); 
    }
    return 0d;
  }

  private Double get(Double score) {
    if (map == null) {
       init();
    }
    return map.get(score);    
  }
 
  // Creating in-memory hash table for look-up (once per map task)
  private void init() {
    map = new HashMap<Double, Double>();
    try {
      BufferedReader file = new BufferedReader(new FileReader("score.txt"));
      String line = "";
      while ((line = file.readLine()) != null) {
        String parts[] = line.split("\t");
        map.put(Double.valueOf(parts[0]), Double.valueOf(parts[1]));
      }
      file.close();
    }
    catch (Exception e) {}   // Add proper error handling here
  }
}

This code assumes that SCORES table data is exported to score.txt file (TAB-delimited).

The next step is to compile and create JAR file:

javac -classpath .:/usr/lib/hive/lib/hive-exec.jar Scores.java
jar cf Scores.jar Scores.class

Before we can use our UDF we have to run the following statements in Hive:

-- Add JAR and TXT files to distributed cache so they are available on each cluster node
ADD JAR Scores.jar;
ADD FILE scores.txt;

-- Register function
CREATE TEMPORARY FUNCTION fn_scores AS 'Scores';

Now when we use this UDF in Hive queries, the score.txt file is available at each node (some sort of replicated/broadcast join), loaded into in-memory hash table by UDF and used to look-up data. The initialization is performed once per map task.

Spark: Reading Sequence Files Generated by Hive

You may need to work with Sequence files generated by Hive for some table. Let’s see how we can deal with such files in Spark.

Load Data into a Hive Table

Assume we are given a TAB-delimited data file having the following content:

AT    Austria
BE    Belgium
BY    Belarus
EE    Estonia
FR    France
DE    Germany
GB    United Kingdom
US    United States

First move the file to HDFS and create a table on top of it (TextFile so far):

CREATE TABLE states_raw
(
   code STRING,
   name STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; 
 
LOAD DATA LOCAL INPATH 'states.txt' INTO TABLE states_raw;

How let’s load data into a table in SequenceFile format:

CREATE TABLE states_seq
(
   code STRING,
   name STRING
)
STORED AS SEQUENCEFILE
LOCATION '/user/dmtolpeko/states_seq';

INSERT INTO TABLE states_seq SELECT code, name FROM states_raw;

You can see a sequence file created in the table directory /user/dmtolpeko/states_seq:

spark_seqfiles1

If you study the content of the SequenceFile, you can notice the following:

  • Key is org.apache.hadoop.io.BytesWritable
  • Key is set to NULL for all SequenceFile records
  • Value is org.apache.hadoop.io.Text
  • Value contains all columns separated by ‘\01′ by default

spark_seqfiles2

As you see an uncompressed SequenceFile looks like a regular text file, only a new Key field is added and it stores a NULL value.

Working with SequenceFile in Spark

Now we are going to work with this file in Spark. Firstly we create a RDD as follows:

import org.apache.hadoop.io._

val file=sc.sequenceFile[BytesWritable,String]("hdfs://hdm:8020/user/dmtolpeko/states_seq")

Note that if the directory contains multiple SequenceFiles all of them will be added to RDD.

If you try to perform any actions on this RDD, you will receive a serialization error due to org.apache.hadoop.io.BytesWritable key:

file.collect
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0
(TID 0) had a not serializable result: org.apache.hadoop.io.BytesWritable
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$
   DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
...

Before you can perform any actions, you have to convert BytesWritable:

file.map(x => (x._1.copyBytes(), x._2)).collect

The result of execution (formatted):

res: Array[(Array[Byte], String)] = Array(
  (Array(), AT?Austria), 
  (Array(), BE?Belgium), 
  (Array(), BY?Belarus), 
  (Array(), EE?Estonia), 
  (Array(), FR?France), 
  (Array(), DE?Germany), 
  (Array(), GB?United Kingdom), 
  (Array(), US?United States))

You can see that the key is empty byte array (NULL value), and value contains concatenated values for all columns. Let’s get rid of NULL key and transform the SequenceFile RDD to a more meaningful key-value pairs:

file.map(x => x._2.split('\01')).map(x => (x(0), x(1))).collect

The result of execution (formatted):

res: Array[(String, String)] = Array(
  (AT, Austria), 
  (BE, Belgium), 
  (BY, Belarus), 
  (EE, Estonia), 
  (FR, France), 
  (DE, Germany), 
  (GB, United Kingdom), 
  (US, United States))

Once you transformed SequenceFile RDD you can use its data in reduce and group by operations as well as map-side joins.