Spark: Reading Sequence Files Generated by Hive

You may need to work with Sequence files generated by Hive for some table. Let’s see how we can deal with such files in Spark.

Load Data into a Hive Table

Assume we are given a TAB-delimited data file having the following content:

AT    Austria
BE    Belgium
BY    Belarus
EE    Estonia
FR    France
DE    Germany
GB    United Kingdom
US    United States

First move the file to HDFS and create a table on top of it (TextFile so far):

CREATE TABLE states_raw
   code STRING,
   name STRING
LOAD DATA LOCAL INPATH 'states.txt' INTO TABLE states_raw;

How let’s load data into a table in SequenceFile format:

CREATE TABLE states_seq
   code STRING,
   name STRING
LOCATION '/user/dmtolpeko/states_seq';

INSERT INTO TABLE states_seq SELECT code, name FROM states_raw;

You can see a sequence file created in the table directory /user/dmtolpeko/states_seq:


If you study the content of the SequenceFile, you can notice the following:

  • Key is
  • Key is set to NULL for all SequenceFile records
  • Value is
  • Value contains all columns separated by ‘\01′ by default


As you see an uncompressed SequenceFile looks like a regular text file, only a new Key field is added and it stores a NULL value.

Working with SequenceFile in Spark

Now we are going to work with this file in Spark. Firstly we create a RDD as follows:


val file=sc.sequenceFile[BytesWritable,String]("hdfs://hdm:8020/user/dmtolpeko/states_seq")

Note that if the directory contains multiple SequenceFiles all of them will be added to RDD.

If you try to perform any actions on this RDD, you will receive a serialization error due to key:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0
(TID 0) had a not serializable result:

Before you can perform any actions, you have to convert BytesWritable: => (x._1.copyBytes(), x._2)).collect

The result of execution (formatted):

res: Array[(Array[Byte], String)] = Array(
  (Array(), AT?Austria), 
  (Array(), BE?Belgium), 
  (Array(), BY?Belarus), 
  (Array(), EE?Estonia), 
  (Array(), FR?France), 
  (Array(), DE?Germany), 
  (Array(), GB?United Kingdom), 
  (Array(), US?United States))

You can see that the key is empty byte array (NULL value), and value contains concatenated values for all columns. Let’s get rid of NULL key and transform the SequenceFile RDD to a more meaningful key-value pairs: => x._2.split('\01')).map(x => (x(0), x(1))).collect

The result of execution (formatted):

res: Array[(String, String)] = Array(
  (AT, Austria), 
  (BE, Belgium), 
  (BY, Belarus), 
  (EE, Estonia), 
  (FR, France), 
  (DE, Germany), 
  (GB, United Kingdom), 
  (US, United States))

Once you transformed SequenceFile RDD you can use its data in reduce and group by operations as well as map-side joins.