Using Python UDF to Aggregate Data in Apache Pig

Apache Pig allows you to use the GROUP statement to combine data by a key, and unlike SQL you do not need to apply an aggregation function like SUM, MAX, MIX to return just a single row for each group. Pig just groups values for each key into separate bags that you can iterate and transform as needed.

Let’s consider the following meteo data set containing state, city, annual high and annual low temperature as follows:

CA,Irvine,73,54
CA,San Diego,70,58
CA,Berkeley,68,48
CA,San Jose,73,51
MA,Boston,59,44
MA,Bedford,60,38
MA,Charlton,58,36
WA,Seattle,60,45
WA,Tacoma,62,45

Now we will group data by state and see the results:

-- Load input data from a file
d = load 's3://epic/dmtolpeko/meteo.txt' 
  using PigStorage(',') as (state:chararray, city:chararray, 
                            high:chararray, low:chararray);

-- Group data by state
g = group d by state; 

-- Show the results
dump g;
(CA,{(CA,San Jose,73,51),(CA,Berkeley,68,48),(CA,San Diego,70,58),(CA,Irvine,73,54)})
(MA,{(MA,Charlton,58,36),(MA,Bedford,60,38),(MA,Boston,59,44)})
(WA,{(WA,Tacoma,62,45),(WA,Seattle,60,45)})

You can see that data are grouped by each key, and we do not need to apply an aggregate function as this is required by GROUP BY in SQL.

Now let’s write a Python UDF to iterate each item in the group and return just 2 first rows with values city and low temperature only.

Note that you can get this functionality in pure Pig syntax, and this example is just intended to show how you can handle bag items inside Python UDF that can be useful to implement some more complex transformations and aggregations.

# Pig UDF returns a bag of 2-element tuples
@outputSchema('result:{t:(city:chararray,low:chararray)}')
def getCitiesLow(data):
    result = []
    # Select first 2 items i group only
    for i in range(2):
        city = data[i][1]
        low = data[i][3]
        result.append((city, low))
    return result

Put this Python code to a file getCitiesLow.py and run the following Pig script:

-- Register UDF
register './getCitiesLow.py' USING jython as udf;

-- Transforming data using UDF
s = foreach g generate group, udf.getCitiesLow(d);

-- Show the results
dump s;
(CA,{(San Jose,51),(Berkeley,48)})
(MA,{(Charlton,36),(Bedford,38)})
(WA,{(Tacoma,45),(Seattle,45)})

From this example you can learn how nicely you can handle Pig a bag of tuples in Python, it just becomes a list of tuples that you can iterate and extract individual items. You can also see how the input group can be transformed: in our example we selected only 2 rows from each group and returned a different number of columns. This can useful in some advanced transformations.

Leave a Reply