Access Amazon S3 from Spark

The first step is to specify AWS Hadoop libraries when launching PySpark:

./bin/pyspark --packages org.apache.hadoop:hadoop-aws:2.7.1

Then before you can access objects on Amazon S3, you have to specify your access keys:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<key>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey","<key>")

Now let’s open a file and calculate the number of rows in it:

f=sc.textFile("s3n://epic/dmtolpeko/fs_sum.txt")
f.count()

For my sample file the result as follows:

199

Spark on YARN Submit Errors on Hortonworks

When you start Spark on YARN using Spark shell as

spark/bin/spark-shell --master yarn-client

You can get the following errors on Hortonworks:

...
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

:10: error: not found: value sqlContext
      import sqlContext.implicits._
:10: error: not found: value sqlContext
       import sqlContext.sql

Additionally when you open the Application Master log you can see:

Log Type: stderr
Log Upload Time: Tue Nov 17 06:59:35 -0800 2015
Log Length: 87
Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

To solve this issue, edit spark-defaults.conf and specify:

spark.driver.extraJavaOptions -Dhdp.version=current
spark.yarn.am.extraJavaOptions -Dhdp.version=current

In my case this helped launch the Spark shell successfully, and I could see the command prompt:

15/11/17 07:37:05 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala>

I used Spark 1.5.2 and HDP 2.2.4.8

Spark: Reading Sequence Files Generated by Hive

You may need to work with Sequence files generated by Hive for some table. Let’s see how we can deal with such files in Spark.

Load Data into a Hive Table

Assume we are given a TAB-delimited data file having the following content:

AT    Austria
BE    Belgium
BY    Belarus
EE    Estonia
FR    France
DE    Germany
GB    United Kingdom
US    United States

First move the file to HDFS and create a table on top of it (TextFile so far):

CREATE TABLE states_raw
(
   code STRING,
   name STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; 
 
LOAD DATA LOCAL INPATH 'states.txt' INTO TABLE states_raw;

How let’s load data into a table in SequenceFile format:

CREATE TABLE states_seq
(
   code STRING,
   name STRING
)
STORED AS SEQUENCEFILE
LOCATION '/user/dmtolpeko/states_seq';

INSERT INTO TABLE states_seq SELECT code, name FROM states_raw;

You can see a sequence file created in the table directory /user/dmtolpeko/states_seq:

spark_seqfiles1

If you study the content of the SequenceFile, you can notice the following:

  • Key is org.apache.hadoop.io.BytesWritable
  • Key is set to NULL for all SequenceFile records
  • Value is org.apache.hadoop.io.Text
  • Value contains all columns separated by ‘\01′ by default

spark_seqfiles2

As you see an uncompressed SequenceFile looks like a regular text file, only a new Key field is added and it stores a NULL value.

Working with SequenceFile in Spark

Now we are going to work with this file in Spark. Firstly we create a RDD as follows:

import org.apache.hadoop.io._

val file=sc.sequenceFile[BytesWritable,String]("hdfs://hdm:8020/user/dmtolpeko/states_seq")

Note that if the directory contains multiple SequenceFiles all of them will be added to RDD.

If you try to perform any actions on this RDD, you will receive a serialization error due to org.apache.hadoop.io.BytesWritable key:

file.collect
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0
(TID 0) had a not serializable result: org.apache.hadoop.io.BytesWritable
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$
   DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
...

Before you can perform any actions, you have to convert BytesWritable:

file.map(x => (x._1.copyBytes(), x._2)).collect

The result of execution (formatted):

res: Array[(Array[Byte], String)] = Array(
  (Array(), AT?Austria), 
  (Array(), BE?Belgium), 
  (Array(), BY?Belarus), 
  (Array(), EE?Estonia), 
  (Array(), FR?France), 
  (Array(), DE?Germany), 
  (Array(), GB?United Kingdom), 
  (Array(), US?United States))

You can see that the key is empty byte array (NULL value), and value contains concatenated values for all columns. Let’s get rid of NULL key and transform the SequenceFile RDD to a more meaningful key-value pairs:

file.map(x => x._2.split('\01')).map(x => (x(0), x(1))).collect

The result of execution (formatted):

res: Array[(String, String)] = Array(
  (AT, Austria), 
  (BE, Belgium), 
  (BY, Belarus), 
  (EE, Estonia), 
  (FR, France), 
  (DE, Germany), 
  (GB, United Kingdom), 
  (US, United States))

Once you transformed SequenceFile RDD you can use its data in reduce and group by operations as well as map-side joins.