UDF to Return a Tuple of Bags in Apache Pig

Typically an user-defined function (UDF) is a scalar function i.e. it returns a single value, and quite often you need to call many UDFs to get different calculations on the same input data.

In some cases parsing the same data over and over again can be expensive, so it makes sense to write a UDF that can process input data once but returns multiple values i.e. it returns a tuple.

Let’s complicate our example a little bit and write an UDF that returns a tuple of arrays (bags in terms of Pig Latin, lists in terms of Python).

For example, assume we have a string of numbers (many rows, but single column) as follows:

1,2,10,5,15,8
4,31,2,76,3,16
...

And we want to split these lists of numbers to 2 lists: one for odd and another for even numbers. Here is UDF code in Python:

# Pig UDF returns a tuple of 2 bags
@outputSchema('result:tuple(odd:{t:(val:int)},even:{t:(val:int)})')
def getOddEven(data):
    odd_bag = []
    even_bag = []
    for i in str(data).split(','):
        if int(i) % 2 != 0:
            odd_bag.append(int(i))
        else:
            even_bag.append(int(i))
    return (odd_bag, even_bag)

Put this Python code to a file getOddEven.py and run the following Pig script:

-- Register UDF
register './getOddEven.py' USING jython as udf;

-- Load input data from a file
d = load 's3://epic/dmtolpeko/data.txt' 
         using PigStorage() as (c1:chararray);

-- Transforming data using UDF
s = foreach d generate udf.getOddEven(c1);

-- Show the results
dump s;

Here is the result:

{(1),(5),(15)}, {(2),(10),(8)}
{(31),(3)},     {(4),(2),(76),(16)}

You can see that getOddEven.py function transformed input data into 2 bags: one for odd numbers and another for even numbers, and unlike the traditional scalar UDF approach, we parsed the input data only once.

Access Amazon S3 from Spark

The first step is to specify AWS Hadoop libraries when launching PySpark:

./bin/pyspark --packages org.apache.hadoop:hadoop-aws:2.7.1

Then before you can access objects on Amazon S3, you have to specify your access keys:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<key>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey","<key>")

Now let’s open a file and calculate the number of rows in it:

f=sc.textFile("s3n://epic/dmtolpeko/fs_sum.txt")
f.count()

For my sample file the result as follows:

199