Parsing JSON Columns in Apache Pig

Quite often you have to deal with data sets that contain JSON objects in some columns only. For example, consider the following event stream data containing JSON in payload column:

id timestamp type payload
1 2016-11-09 11:11:13.235 A {“a”:”1″,”b”:”2″,”c”:”3″}
2 2016-11-09 13:18:34.000 A {“a”:”1″,”b”:”8″,”c”:”0″}
3 2016-11-09 15:20:31.568 A {“a”:”3″,”b”:”7″,”c”:”6″}

Parsing Simple JSON Objects

If JSON data contains just key-value pairs i.e. {"key":"value","key2":"value2",...} you can use JsonStringToMap UDF to extract the required values by key. For example,

-- JsonStringToMap is not built-in UDF, so you have to register it
register '/usr/lib/pig/lib/*.jar';
define JsonStringToMap 
  com.twitter.elephantbird.pig.piggybank.JsonStringToMap();

-- Load data
d = load 'data.txt' using PigStorage() 
  as (id:chararray, ts:chararray, type:chararray, payload:chararray);

-- Transform JSON object to Pig MAP
d1 = foreach d generate JsonStringToMap(payload) as payload;

-- Now you can work with JSON object as a Pig MAP and extract values
d2 = foreach d1 generate  payload#'a', payload#'b', payload#'c';

dump d2;

The last statement outputs values for a, b and c keys for every row:

(1,2,3)
(1,8,0)
(3,7,6)

Parsing JSON Arrays

Unfortunately, JsonStringToMap UDF does not work with JSON arrays, i.e. data in the following format:

[
 {"key":"value","key2":"value2",...},
 {"key":"valueA","key2":"value2A",...},
 ...
]

There are a dozen of custom UDFs written to work with JSON arrays, but I would like to show you how to write your own Python UDF to iterate JSON arrays and extract any required data.

Let’s assume that payload column now contains the following data:

[{"a":"1"},{"b":"2"},{"c":"3"}]
[{"a":"1"},{"a":"8"}]
[{"a":"3","b":"7","c":"6"}]

Here we have 3 rows, every row contains a JSON array. Note that the first array contains 3 JSON objects, the second array contains 2 objects, and the third array contains just one JSON object (with 3 key-value pairs).

Let’s write a Pig UDF in Python that returns the number elements in array, and the last value for a key in each array:

import com.xhaus.jyson.JysonCodec as json

@outputSchema('result:(cnt:int, a:int)')
def getItems(data):
    cnt = int(0)
    val = int(0)
    jdata = json.loads(data)
    # iterate items in JSON array, each item is a JSON object
    for i in jdata:
        cnt += 1
        # check whether key "a" exists in the current JSON object
        if "a" in i:
            val = i["a"]
    return (cnt, val)

Now we can invoke our UDF as follows:

register './getItems.py' USING jython as gi;

d1 = foreach d generate gi.getItems(payload);

dump d1;

The last statement outputs the number of items in the array and the last value for a for every row:

(3,1)
(2,8)
(1,3)

You can also extend the Python UDF to parse JSON structure of any complexity.

UDF to Return a Tuple of Bags in Apache Pig

Typically an user-defined function (UDF) is a scalar function i.e. it returns a single value, and quite often you need to call many UDFs to get different calculations on the same input data.

In some cases parsing the same data over and over again can be expensive, so it makes sense to write a UDF that can process input data once but returns multiple values i.e. it returns a tuple.

Let’s complicate our example a little bit and write an UDF that returns a tuple of arrays (bags in terms of Pig Latin, lists in terms of Python).

For example, assume we have a string of numbers (many rows, but single column) as follows:

1,2,10,5,15,8
4,31,2,76,3,16
...

And we want to split these lists of numbers to 2 lists: one for odd and another for even numbers. Here is UDF code in Python:

# Pig UDF returns a tuple of 2 bags
@outputSchema('result:tuple(odd:{t:(val:int)},even:{t:(val:int)})')
def getOddEven(data):
    odd_bag = []
    even_bag = []
    for i in str(data).split(','):
        if int(i) % 2 != 0:
            odd_bag.append(int(i))
        else:
            even_bag.append(int(i))
    return (odd_bag, even_bag)

Put this Python code to a file getOddEven.py and run the following Pig script:

-- Register UDF
register './getOddEven.py' USING jython as udf;

-- Load input data from a file
d = load 's3://epic/dmtolpeko/data.txt' 
         using PigStorage() as (c1:chararray);

-- Transforming data using UDF
s = foreach d generate udf.getOddEven(c1);

-- Show the results
dump s;

Here is the result:

{(1),(5),(15)}, {(2),(10),(8)}
{(31),(3)},     {(4),(2),(76),(16)}

You can see that getOddEven.py function transformed input data into 2 bags: one for odd numbers and another for even numbers, and unlike the traditional scalar UDF approach, we parsed the input data only once.

Access Amazon S3 from Spark

The first step is to specify AWS Hadoop libraries when launching PySpark:

./bin/pyspark --packages org.apache.hadoop:hadoop-aws:2.7.1

Then before you can access objects on Amazon S3, you have to specify your access keys:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<key>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey","<key>")

Now let’s open a file and calculate the number of rows in it:

f=sc.textFile("s3n://epic/dmtolpeko/fs_sum.txt")
f.count()

For my sample file the result as follows:

199