Often you have a simple ETL process (a Pig job in our example) that just applies a filter to the source data, performs some calculations and saves the result to a target table.
So it is a Map-only job (no Reduce phase is required) that generates N output files, where N is the number of map tasks.
For example:
set mapreduce.output.fileoutputformat.compress true -- Read data from a Hive table data_all = LOAD 'dmtolpeko.mcp' USING org.apache.hive.hcatalog.pig.HCatLoader(); -- Filter selects less than 2% of rows from the source table data_filtered = FILTER data_all BY event_name == 'CLIENT_LOGIN'; -- Define out columns and save the results to S3 data_out = FOREACH data_filtered GENERATE app_id, payload, event_timestamp; STORE data_out INTO 's3://epic/hive/dmtolpeko.db/mcp_client_login/';
From the log you can see that the job used 231 mappers:
INFO mapred.FileInputFormat: Total input paths to process : 250 INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 231 INFO util.MapRedUtil: Total input paths (combined) to process : 231 INFO mapreduce.JobSubmitter: number of splits:231
And we can see that the output contains 231 data files (_SUCCESS is an empty file):
aws s3 ls s3://epic/hive/dmtolpeko.db/mcp_client_login/ --summarize 2018-03-01 22:22:33 0 _SUCCESS 2018-03-01 22:22:30 11165 part-m-00000.gz 2018-03-01 22:22:29 11107 part-m-00001.gz 2018-03-01 22:22:30 11346 part-m-00002.gz ... 2018-03-01 22:22:27 5697 part-m-00228.gz 2018-03-01 22:22:26 5686 part-m-00229.gz 2018-03-01 22:22:28 5480 part-m-00230.gz Total Objects: 232 Total Size: 1396533
This job produced a large number of very small files. What can we do to reduce the number of output files?
You can modify the job and add a Reduce phase, for example, by adding ORDER statement. This decreases the performance of the ETL process but produces much smaller number of output files.
... data_out_ordered = ORDER data_out BY event_timestamp; STORE data_out_ordered INTO 's3://epic/hive/dmtolpeko.db/mcp_client_login/';
Now this job produces only one 1.5 MB file:
aws s3 ls s3://epic/hive/dmtolpeko.db/mcp_client_login/ --summarize --human-readable 2018-03-02 14:20:38 0 Bytes _SUCCESS 2018-03-02 14:20:37 1.5 MiB part-r-00000.gz Total Objects: 2 Total Size: 1.5 MiB
ORDER Internals in Apache Pig
Are there any issues with this approach? Let’s investigate how ORDER works in Apache Pig.
Without ORDER we had a singe MapReduce job, and now with ORDER we always have 3 (!) Map Reduce jobs in Pig. Sure it is theoretically possible to use a single Map Reduce job (with 1 Reducer) to sort the data, but Pig uses 3 jobs instead.
First MR Job for ORDER
In the first job Pig applies filters to the source data and writes the result to HDFS:
Note the following issue: the job read 17 GB of data from the source table, and wrote 200 MB to HDFS while the final result is 1.5 MB.
The problem here is that the intermediate result is uncompressed by default that is why 200 MB were written instead of 1.5 MB. Let’s add the following options to the Pig script and restart the job:
set pig.tmpfilecompression true set pig.tmpfilecompression.codec gz
You can see that the result is much better now:
Second MR Job for ORDER
The second job is SAMPLER. It reads all (!) data written by the first job, each mapper selects some rows (100 by default) and sends them to the single Reducer that defines the range of values for the sorting column(s).
Pig does this to enable a distributed sort using multiple reducers instead of using only one reducer to sort the data.
Although this requires to read the filtered data twice it still can improve the overall performance by distributing the work among multiple reducers.
Third MR Job for ORDER
The third job now knows how many reducers it needs to start for distributed sorting, how to partition the order columns, so it can start distributed sorting the data. The number of output files is equal to the number of reducers.
Summary
It is possible to use ORDER in Apache Pig to reduce the number of output files for a Pig job, but it requires allocating resources and running 3 MapReduce jobs.