FIRST_VALUE, LAST_VALUE, LEAD and LAG in Spark

I needed to migrate a Map Reduce job to Spark, but this job was previously migrated from SQL :) and contains implementation of FIRST_VALUE, LAST_VALUE, LEAD and LAG analytic window functions in its reducer.

In this article I would like to share some ideas how to implement FIRST_VALUE and LAST_VALUE analytic functions in Spark (not Spark SQL). It is quite easy to extend the code to implement LEAD and LAG functions with any specified offset.

Let’s define a sample data set as follows (the first column is the grouping key, the second is the value):

val data=sc.parallelize(List(
   ("A", "A1"),
   ("A", "A2"),
   ("A", "A3"),
   ("B", "B1"),
   ("B", "B2"),
   ("C", "C1")))

The result we need:

Group key    Value     First Value in Group      Last Value in Group
---------    -----     --------------------      -------------------
        A       A1                       A1                       A3  
        A       A2                       A1                       A3   
        A       A3                       A1                       A3   
        B       B1                       B1                       B2   
        B       B2                       B1                       B2 
        C       C1                       C1                       C1 

I defined a function that returns first and last value in a group:

def firstLastValue(items: Iterable[String]) = for { i <- items 
} yield (i, items.head, items.last)

Now I can group rows by a key and get first and last values:

data.groupByKey().map{case(a, b)=>(a, firstLastValue(b))}.collect 

The result of the execution (formatted):

res: Array[(String, Iterable[(String, String, String)])] = Array(
 (B, List((B1,B1,B2), 
          (B2,B1,B2))), 
 (A, List((A1,A1,A3), 
          (A2,A1,A3), 
          (A3,A1,A3))),
 (C, List((C1,C1,C1))))

Note that I used groupByKey, not reduceByKey as we need to work with the entire window.

How It Works

Firstly we use Spark groupByKey function to collect and group all values for each key in the data set. As the result for each key we get the key and the collection of all values for this key.

The next step is to iterate through all values and return a tuple containing the value itself as well as the first and last value in the collection. You can extend this approach to get the values from the previous and following rows with any offset you need.

Multi-Column Key and Value – Reduce a Tuple in Spark

In many tutorials key-value is typically a pair of single scalar values, for example (‘Apple’, 7). But key-value is a general concept and both key and value often consist of multiple fields, and they both can be non-unique.

Consider a typical SQL statement:

SELECT store, product, SUM(amount), MIN(amount), MAX(amount), SUM(units)
FROM sales
GROUP BY store, product

Columns store and product can be considered as a key, and columns amount and units as values.

Let’s implement this SQL statement in Spark. Firstly we define a sample data set:

val sales=sc.parallelize(List(
   ("West",  "Apple",  2.0, 10),
   ("West",  "Apple",  3.0, 15),
   ("West",  "Orange", 5.0, 15),
   ("South", "Orange", 3.0, 9),
   ("South", "Orange", 6.0, 18),
   ("East",  "Milk",   5.0, 5)))

The Spark/Scala code equivalent to the SQL statement is as follows:

sales.map{ case (store, prod, amt, units) => ((store, prod), (amt, amt, amt, units)) }.
  reduceByKey((x, y) => 
   (x._1 + y._1, math.min(x._2, y._2), math.max(x._3, y._3), x._4 + y._4)).collect

The result of the execution (formatted):

res: Array[((String, String), (Double, Double, Double, Int))] = Array(
  ((West, Orange), (5.0, 5.0, 5.0, 15)), 
  ((West, Apple),  (5.0, 2.0, 3.0, 25)), 
  ((East, Milk),   (5.0, 5.0, 5.0, 5)), 
  ((South, Orange),(9.0, 3.0, 6.0, 27)))

How It Works

We have an input RRD sales containing 6 rows and 4 columns (String, String, Double, Int). The first step is to define which columns belong to the key and which to the value. You can use map function on RDD as follows:

sales.map{ case (store, prod, amt, units) => ((store, prod), (amt, amt, amt, units)) }

We defined a key-value tuple where key is also tuple containing (store, prod) and value is tuple containing the final results we are going to calculate (amt, amt, amt, units)

Note that we initialized SUM, MIN and MAX with amt, so if there is only one row in a group then SUM, MIN, MAX values will be the same and equal to amt.

The next step is to reduce values by key:

  reduceByKey((x, y) => 

In this function x is the result of reduction of 2 previous values, and y is the current value. Remember that both x and y are tuples containing (amt, amt, amt, units).

Reduce is an associative operation and it works similar to adding 2 + 4 + 3 + 5 + 6 + … You take first 2 values, add them, then take 3rd values, add it and so on.

Now it is easier to understand the meaning of

reduceByKey((x, y) => 
  (x._1 + y._1, math.min(x._2, y._2), math.max(x._3, y._3), x._4 + y._4)).collect

Sum of the previous and current amounts (_1 means the first item of a tuple):

x._1 + y._1

Selecting MIN amount:

math.min(x._2, y._2)

Selecting MAX amount:

math.max(x._3, y._3)

Sum of the previous and current units:

x._4 + y._4

How to Select Specified Columns – Projection in Spark

Projection i.e. selection of the specified columns from a data set is one of the basic data manipulation operations. How to do it in Spark?

Consider a sample data set, a TAB-delimited file airports.txt containing:

Los Angeles International Airport        LAX     Los Angeles     CA
San Francisco International Airport      SFO     San Francisco   CA
Seattle–Tacoma International Airport     SEA     Seattle         WA

Let’s select only 3rd and 2nd columns and create TAB-delimited file(s) in airports_out directory containing:

Los Angeles      LAX
San Francisco    SFO
Seattle          SEA

Below is Scala code to achieve this using Spark:

Create RDD for the source file

Source file is located in HDFS.

val file = sc.textFile("hdfs://hdm:8020/user/dmtolpeko/airports.txt")

Select the specified columns in the specified order

Use map function to

a) split each line into array of strings
b) select specified items (columns) from array and create the resulting string (line)

val file2 = file.map(_.split("\t")).map(c => c(2) + "\t" + c(1))

Save the results

Save the results to an HDFS directory:

file2.saveAsTextFile("hdfs://hdm:8020/user/dmtolpeko/airports_out")

Note that you have to specify a directory for the output, and the result may contain multiple files:

projection

projection2

projection3