Spark: Reading Sequence Files Generated by Hive

You may need to work with Sequence files generated by Hive for some table. Let’s see how we can deal with such files in Spark.

Load Data into a Hive Table

Assume we are given a TAB-delimited data file having the following content:

AT    Austria
BE    Belgium
BY    Belarus
EE    Estonia
FR    France
DE    Germany
GB    United Kingdom
US    United States

First move the file to HDFS and create a table on top of it (TextFile so far):

CREATE TABLE states_raw
(
   code STRING,
   name STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; 
 
LOAD DATA LOCAL INPATH 'states.txt' INTO TABLE states_raw;

How let’s load data into a table in SequenceFile format:

CREATE TABLE states_seq
(
   code STRING,
   name STRING
)
STORED AS SEQUENCEFILE
LOCATION '/user/dmtolpeko/states_seq';

INSERT INTO TABLE states_seq SELECT code, name FROM states_raw;

You can see a sequence file created in the table directory /user/dmtolpeko/states_seq:

spark_seqfiles1

If you study the content of the SequenceFile, you can notice the following:

  • Key is org.apache.hadoop.io.BytesWritable
  • Key is set to NULL for all SequenceFile records
  • Value is org.apache.hadoop.io.Text
  • Value contains all columns separated by ‘\01′ by default

spark_seqfiles2

As you see an uncompressed SequenceFile looks like a regular text file, only a new Key field is added and it stores a NULL value.

Working with SequenceFile in Spark

Now we are going to work with this file in Spark. Firstly we create a RDD as follows:

import org.apache.hadoop.io._

val file=sc.sequenceFile[BytesWritable,String]("hdfs://hdm:8020/user/dmtolpeko/states_seq")

Note that if the directory contains multiple SequenceFiles all of them will be added to RDD.

If you try to perform any actions on this RDD, you will receive a serialization error due to org.apache.hadoop.io.BytesWritable key:

file.collect
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0
(TID 0) had a not serializable result: org.apache.hadoop.io.BytesWritable
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$
   DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
...

Before you can perform any actions, you have to convert BytesWritable:

file.map(x => (x._1.copyBytes(), x._2)).collect

The result of execution (formatted):

res: Array[(Array[Byte], String)] = Array(
  (Array(), AT?Austria), 
  (Array(), BE?Belgium), 
  (Array(), BY?Belarus), 
  (Array(), EE?Estonia), 
  (Array(), FR?France), 
  (Array(), DE?Germany), 
  (Array(), GB?United Kingdom), 
  (Array(), US?United States))

You can see that the key is empty byte array (NULL value), and value contains concatenated values for all columns. Let’s get rid of NULL key and transform the SequenceFile RDD to a more meaningful key-value pairs:

file.map(x => x._2.split('\01')).map(x => (x(0), x(1))).collect

The result of execution (formatted):

res: Array[(String, String)] = Array(
  (AT, Austria), 
  (BE, Belgium), 
  (BY, Belarus), 
  (EE, Estonia), 
  (FR, France), 
  (DE, Germany), 
  (GB, United Kingdom), 
  (US, United States))

Once you transformed SequenceFile RDD you can use its data in reduce and group by operations as well as map-side joins.

Map-Side Join in Spark

Join of two or more data sets is one of the most widely used operations you do with your data, but in distributed systems it can be a huge headache. In general, since your data are distributed among many nodes, they have to be shuffled before a join that causes significant network I/O and slow performance.

Fortunately, if you need to join a large table (fact) with relatively small tables (dimensions) i.e. to perform a star-schema join you can avoid sending all data of the large table over the network. This type of join is called map-side join in Hadoop community. In other distributed systems, it is often called replicated or broadcast join.

Let’s use the following sample data (one fact and two dimension tables):

// Fact table
val flights = sc.parallelize(List(
  ("SEA", "JFK", "DL", "418",  "7:00"),
  ("SFO", "LAX", "AA", "1250", "7:05"),
  ("SFO", "JFK", "VX", "12",   "7:05"),
  ("JFK", "LAX", "DL", "424",  "7:10"),
  ("LAX", "SEA", "DL", "5737", "7:10")))  
  
// Dimension table
val airports = sc.parallelize(List(
  ("JFK", "John F. Kennedy International Airport", "New York", "NY"),
  ("LAX", "Los Angeles International Airport", "Los Angeles", "CA"),
  ("SEA", "Seattle-Tacoma International Airport", "Seattle", "WA"),
  ("SFO", "San Francisco International Airport", "San Francisco", "CA")))
  
// Dimension table
val airlines = sc.parallelize(List(
  ("AA", "American Airlines"), 
  ("DL", "Delta Airlines"), 
  ("VX", "Virgin America")))   

We need to join the fact and dimension tables to get the following result:

Seattle           New York       Delta Airlines       418   7:00
San Francisco     Los Angeles    American Airlines    1250  7:05
San Francisco     New York       Virgin America       12    7:05
New York          Los Angeles    Delta Airlines       424   7:10
Los Angeles       Seattle        Delta Airlines       5737  7:10

The fact table be very large, while dimension tables are often quite small. Let’s download the dimension tables to the Spark driver, create maps and broadcast them to each worker node:

val airportsMap = sc.broadcast(airports.map{case(a, b, c, d) => (a, c)}.collectAsMap)
val airlinesMap = sc.broadcast(airlines.collectAsMap)

Now you can run the map-side join:

flights.map{case(a, b, c, d, e) => 
   (airportsMap.value.get(a).get, 
    airportsMap.value.get(b).get, 
    airlinesMap.value.get(c).get, d, e)}.collect

The result of the execution (formatted):

res: Array[(String, String, String, String, String)] = Array(
  (Seattle, New York, Delta Airlines, 418, 7:00), 
  (San Francisco, Los Angeles, American Airlines, 1250, 7:05), 
  (San Francisco, New York, Virgin America, 12, 7:05), 
  (New York, Los Angeles, Delta Airlines, 424, 7:10), 
  (Los Angeles, Seattle, Delta Airlines, 5737, 7:10))

How it Works

First we created a RDD for each table. airports and airlines are dimension tables that we are going to use in map-side join, so we converted them to a map and broadcast to each execution node. Note that we extracted only 2 columns from airports table.

Then we just used map function for each row of flights table, and retrieved dimension values from airportsMap and airlinesMap. If flights table is very large, map function will be executed concurrently for each partition that has own copy of airportsMap and airlinesMap maps.

This approach allows us not to shuffle the fact table, and to get quite good join performance.