Performance Issues Using ORDER to Reduce the Number of Out Files – Apache Pig 0.16 Amazon EMR

Often you have a simple ETL process (a Pig job in our example) that just applies a filter to the source data, performs some calculations and saves the result to a target table.

So it is a Map-only job (no Reduce phase is required) that generates N output files, where N is the number of map tasks.

For example:

set mapreduce.output.fileoutputformat.compress true

-- Read data from a Hive table
data_all = LOAD 'dmtolpeko.mcp' USING org.apache.hive.hcatalog.pig.HCatLoader();

-- Filter selects less than 2% of rows from the source table
data_filtered = FILTER data_all BY event_name == 'CLIENT_LOGIN'; 

-- Define out columns and save the results to S3
data_out = FOREACH data_filtered GENERATE app_id, payload, event_timestamp;
STORE data_out INTO 's3://epic/hive/dmtolpeko.db/mcp_client_login/';   

From the log you can see that the job used 231 mappers:

INFO mapred.FileInputFormat: Total input paths to process : 250
INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil  - Total input paths (combined) to process : 231
INFO util.MapRedUtil: Total input paths (combined) to process : 231
INFO mapreduce.JobSubmitter: number of splits:231

And we can see that the output contains 231 data files (_SUCCESS is an empty file):

aws s3 ls s3://epic/hive/dmtolpeko.db/mcp_client_login/ --summarize

2018-03-01 22:22:33          0 _SUCCESS
2018-03-01 22:22:30      11165 part-m-00000.gz
2018-03-01 22:22:29      11107 part-m-00001.gz
2018-03-01 22:22:30      11346 part-m-00002.gz
...
2018-03-01 22:22:27       5697 part-m-00228.gz
2018-03-01 22:22:26       5686 part-m-00229.gz
2018-03-01 22:22:28       5480 part-m-00230.gz

Total Objects: 232
   Total Size: 1396533

This job produced a large number of very small files. What can we do to reduce the number of output files?

You can modify the job and add a Reduce phase, for example, by adding ORDER statement. This decreases the performance of the ETL process but produces much smaller number of output files.

...
data_out_ordered = ORDER data_out BY event_timestamp;

STORE data_out_ordered INTO 's3://epic/hive/dmtolpeko.db/mcp_client_login/';   

Now this job produces only one 1.5 MB file:

aws s3 ls s3://epic/hive/dmtolpeko.db/mcp_client_login/ --summarize --human-readable
2018-03-02 14:20:38    0 Bytes _SUCCESS
2018-03-02 14:20:37    1.5 MiB part-r-00000.gz

Total Objects: 2
   Total Size: 1.5 MiB

ORDER Internals in Apache Pig

Are there any issues with this approach? Let’s investigate how ORDER works in Apache Pig.

Without ORDER we had a singe MapReduce job, and now with ORDER we always have 3 (!) Map Reduce jobs in Pig. Sure it is theoretically possible to use a single Map Reduce job (with 1 Reducer) to sort the data, but Pig uses 3 jobs instead.

First MR Job for ORDER

In the first job Pig applies filters to the source data and writes the result to HDFS:

pig_order1

Note the following issue: the job read 17 GB of data from the source table, and wrote 200 MB to HDFS while the final result is 1.5 MB.

The problem here is that the intermediate result is uncompressed by default that is why 200 MB were written instead of 1.5 MB. Let’s add the following options to the Pig script and restart the job:

set pig.tmpfilecompression true
set pig.tmpfilecompression.codec gz

You can see that the result is much better now:

pig_order2

Second MR Job for ORDER

The second job is SAMPLER. It reads all (!) data written by the first job, each mapper selects some rows (100 by default) and sends them to the single Reducer that defines the range of values for the sorting column(s).

Pig does this to enable a distributed sort using multiple reducers instead of using only one reducer to sort the data.

Although this requires to read the filtered data twice it still can improve the overall performance by distributing the work among multiple reducers.

Third MR Job for ORDER

The third job now knows how many reducers it needs to start for distributed sorting, how to partition the order columns, so it can start distributed sorting the data. The number of output files is equal to the number of reducers.

Summary

It is possible to use ORDER in Apache Pig to reduce the number of output files for a Pig job, but it requires allocating resources and running 3 MapReduce jobs.

Leave a Reply