Performance Issues Using ORDER to Reduce the Number of Out Files – Apache Pig 0.16 Amazon EMR

Often you have a simple ETL process (a Pig job in our example) that just applies a filter to the source data, performs some calculations and saves the result to a target table.

So it is a Map-only job (no Reduce phase is required) that generates N output files, where N is the number of map tasks.

For example:

set mapreduce.output.fileoutputformat.compress true

-- Read data from a Hive table
data_all = LOAD 'dmtolpeko.mcp' USING org.apache.hive.hcatalog.pig.HCatLoader();

-- Filter selects less than 2% of rows from the source table
data_filtered = FILTER data_all BY event_name == 'CLIENT_LOGIN'; 

-- Define out columns and save the results to S3
data_out = FOREACH data_filtered GENERATE app_id, payload, event_timestamp;
STORE data_out INTO 's3://epic/hive/dmtolpeko.db/mcp_client_login/';   

From the log you can see that the job used 231 mappers:

INFO mapred.FileInputFormat: Total input paths to process : 250
INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil  - Total input paths (combined) to process : 231
INFO util.MapRedUtil: Total input paths (combined) to process : 231
INFO mapreduce.JobSubmitter: number of splits:231

And we can see that the output contains 231 data files (_SUCCESS is an empty file):

aws s3 ls s3://epic/hive/dmtolpeko.db/mcp_client_login/ --summarize

2018-03-01 22:22:33          0 _SUCCESS
2018-03-01 22:22:30      11165 part-m-00000.gz
2018-03-01 22:22:29      11107 part-m-00001.gz
2018-03-01 22:22:30      11346 part-m-00002.gz
2018-03-01 22:22:27       5697 part-m-00228.gz
2018-03-01 22:22:26       5686 part-m-00229.gz
2018-03-01 22:22:28       5480 part-m-00230.gz

Total Objects: 232
   Total Size: 1396533

This job produced a large number of very small files. What can we do to reduce the number of output files?
Continue reading

Using Python UDF to Aggregate Data in Apache Pig

Apache Pig allows you to use the GROUP statement to combine data by a key, and unlike SQL you do not need to apply an aggregation function like SUM, MAX, MIX to return just a single row for each group. Pig just groups values for each key into separate bags that you can iterate and transform as needed.

Let’s consider the following meteo data set containing state, city, annual high and annual low temperature as follows:

CA,San Diego,70,58
CA,San Jose,73,51

Now we will group data by state and see the results:

-- Load input data from a file
d = load 's3://epic/dmtolpeko/meteo.txt' 
  using PigStorage(',') as (state:chararray, city:chararray, 
                            high:chararray, low:chararray);

-- Group data by state
g = group d by state; 

-- Show the results
dump g;
(CA,{(CA,San Jose,73,51),(CA,Berkeley,68,48),(CA,San Diego,70,58),(CA,Irvine,73,54)})

You can see that data are grouped by each key, and we do not need to apply an aggregate function as this is required by GROUP BY in SQL.

Now let’s write a Python UDF to iterate each item in the group and return just 2 first rows with values city and low temperature only.

Note that you can get this functionality in pure Pig syntax, and this example is just intended to show how you can handle bag items inside Python UDF that can be useful to implement some more complex transformations and aggregations.

# Pig UDF returns a bag of 2-element tuples
def getCitiesLow(data):
    result = []
    # Select first 2 items i group only
    for i in range(2):
        city = data[i][1]
        low = data[i][3]
        result.append((city, low))
    return result

Put this Python code to a file and run the following Pig script:

-- Register UDF
register './' USING jython as udf;

-- Transforming data using UDF
s = foreach g generate group, udf.getCitiesLow(d);

-- Show the results
dump s;
(CA,{(San Jose,51),(Berkeley,48)})

From this example you can learn how nicely you can handle Pig a bag of tuples in Python, it just becomes a list of tuples that you can iterate and extract individual items. You can also see how the input group can be transformed: in our example we selected only 2 rows from each group and returned a different number of columns. This can useful in some advanced transformations.

Parsing JSON Columns in Apache Pig

Quite often you have to deal with data sets that contain JSON objects in some columns only. For example, consider the following event stream data containing JSON in payload column:

id timestamp type payload
1 2016-11-09 11:11:13.235 A {“a”:”1″,”b”:”2″,”c”:”3″}
2 2016-11-09 13:18:34.000 A {“a”:”1″,”b”:”8″,”c”:”0″}
3 2016-11-09 15:20:31.568 A {“a”:”3″,”b”:”7″,”c”:”6″}

Parsing Simple JSON Objects

If JSON data contains just key-value pairs i.e. {"key":"value","key2":"value2",...} you can use JsonStringToMap UDF to extract the required values by key. For example,

-- JsonStringToMap is not built-in UDF, so you have to register it
register '/usr/lib/pig/lib/*.jar';
define JsonStringToMap 

-- Load data
d = load 'data.txt' using PigStorage() 
  as (id:chararray, ts:chararray, type:chararray, payload:chararray);

-- Transform JSON object to Pig MAP
d1 = foreach d generate JsonStringToMap(payload) as payload;

-- Now you can work with JSON object as a Pig MAP and extract values
d2 = foreach d1 generate  payload#'a', payload#'b', payload#'c';

dump d2;

The last statement outputs values for a, b and c keys for every row:


Parsing JSON Arrays

Unfortunately, JsonStringToMap UDF does not work with JSON arrays, i.e. data in the following format:


There are a dozen of custom UDFs written to work with JSON arrays, but I would like to show you how to write your own Python UDF to iterate JSON arrays and extract any required data.

Let’s assume that payload column now contains the following data:


Here we have 3 rows, every row contains a JSON array. Note that the first array contains 3 JSON objects, the second array contains 2 objects, and the third array contains just one JSON object (with 3 key-value pairs).

Let’s write a Pig UDF in Python that returns the number elements in array, and the last value for a key in each array:

import com.xhaus.jyson.JysonCodec as json

@outputSchema('result:(cnt:int, a:int)')
def getItems(data):
    cnt = int(0)
    val = int(0)
    jdata = json.loads(data)
    # iterate items in JSON array, each item is a JSON object
    for i in jdata:
        cnt += 1
        # check whether key "a" exists in the current JSON object
        if "a" in i:
            val = i["a"]
    return (cnt, val)

Now we can invoke our UDF as follows:

register './' USING jython as gi;

d1 = foreach d generate gi.getItems(payload);

dump d1;

The last statement outputs the number of items in the array and the last value for a for every row:


You can also extend the Python UDF to parse JSON structure of any complexity.