Spark on YARN Submit Errors on Hortonworks

When you start Spark on YARN using Spark shell as

spark/bin/spark-shell --master yarn-client

You can get the following errors on Hortonworks:

...
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

:10: error: not found: value sqlContext
      import sqlContext.implicits._
:10: error: not found: value sqlContext
       import sqlContext.sql

Additionally when you open the Application Master log you can see:

Log Type: stderr
Log Upload Time: Tue Nov 17 06:59:35 -0800 2015
Log Length: 87
Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher

To solve this issue, edit spark-defaults.conf and specify:

spark.driver.extraJavaOptions -Dhdp.version=current
spark.yarn.am.extraJavaOptions -Dhdp.version=current

In my case this helped launch the Spark shell successfully, and I could see the command prompt:

15/11/17 07:37:05 INFO repl.SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala>

I used Spark 1.5.2 and HDP 2.2.4.8

Loading Files to Dynamic Partitions in Hive

Fact tables usually have a partition column that specifies the date (or hour) when the data were loaded. For example, the sales fact table can be defined as follows:

CREATE TABLE sales
(
  location_id INT, 
  product_id INT, 
  quantity INT, 
  price DECIMAL(10,2), 
  amount DECIMAL(10,2),
  trans_dtm TIMESTAMP
) 
  PARTITIONED BY (load_dt STRING);

Normally, when data for fact tables arrive daily (or hourly), you can just create a new named partition and load data:

INSERT OVERWRITE TABLE sales PARTITION (load_dt='20151111')
  SELECT ... FROM sales_stg

But in a real world you have to design your ETL process to handle outages, to automatically reprocess data for the previous periods on so on. So you have to design your process to handle multiple files in your staging area, and load data to multiple target partitions.

Assume your HDFS staging area contains sales data for 2 days:

multfiles

On top of this directory you can create an external table in Hive:

CREATE EXTERNAL TABLE sales_stg
(
  location_id STRING, 
  product_id STRING, 
  quantity STRING, 
  price STRING, 
  amount STRING,
  trans_dtm  STRING
) 
  ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
  STORED AS TEXTFILE
  LOCATION '/in/sales';

This external table points to the directory that currently contains sales data files for 2 days. You can use the following approach to load data into 2 partitions dynamically:

INSERT OVERWRITE TABLE sales PARTITION (load_dt)
  SELECT
    location_id, 
    product_id, 
    quantity, 
    price, 
    amount,
    trans_dtm,
    regexp_extract(INPUT__FILE__NAME,'.*sales_(.*)\.txt', 1) AS load_dt
  FROM sales_stg

INPUT__FILE__NAME is a virtual column that specifies in which file the current row is located. The partition value is extracted from the file name, so data loaded into multiple partitions using the single SQL INSERT statement.