I needed to migrate a Map Reduce job to Spark, but this job was previously migrated from SQL and contains implementation of FIRST_VALUE, LAST_VALUE, LEAD and LAG analytic window functions in its reducer.

In this article I would like to share some ideas how to implement FIRST_VALUE and LAST_VALUE analytic functions in Spark (not Spark SQL). It is quite easy to extend the code to implement LEAD and LAG functions with any specified offset.

Let’s define a sample data set as follows (the first column is the grouping key, the second is the value):

val data=sc.parallelize(List( ("A", "A1"), ("A", "A2"), ("A", "A3"), ("B", "B1"), ("B", "B2"), ("C", "C1")))

The result we need:

Group key Value First Value in Group Last Value in Group --------- ----- -------------------- ------------------- A A1 A1 A3 A A2 A1 A3 A A3 A1 A3 B B1 B1 B2 B B2 B1 B2 C C1 C1 C1

I defined a function that returns first and last value in a group:

def firstLastValue(items: Iterable[String]) = for { i <- items } yield (i, items.head, items.last)

Now I can group rows by a key and get first and last values:

data.groupByKey().map{case(a, b)=>(a, firstLastValue(b))}.collect

The result of the execution (formatted):

res: Array[(String, Iterable[(String, String, String)])] = Array( (B, List((B1,B1,B2), (B2,B1,B2))), (A, List((A1,A1,A3), (A2,A1,A3), (A3,A1,A3))), (C, List((C1,C1,C1))))

Note that I used `groupByKey`

, not `reduceByKey`

as we need to work with the entire window.

#### How It Works

Firstly we use Spark `groupByKey`

function to collect and group all values for each key in the data set. As the result for each key we get the key and the collection of all values for this key.

The next step is to iterate through all values and return a tuple containing the value itself as well as the first and last value in the collection. You can extend this approach to get the values from the previous and following rows with any offset you need.