Performance Issues Using ORDER to Reduce the Number of Out Files – Apache Pig 0.16 Amazon EMR

Often you have a simple ETL process (a Pig job in our example) that just applies a filter to the source data, performs some calculations and saves the result to a target table.

So it is a Map-only job (no Reduce phase is required) that generates N output files, where N is the number of map tasks.

For example:

set mapreduce.output.fileoutputformat.compress true

-- Read data from a Hive table
data_all = LOAD 'dmtolpeko.mcp' USING org.apache.hive.hcatalog.pig.HCatLoader();

-- Filter selects less than 2% of rows from the source table
data_filtered = FILTER data_all BY event_name == 'CLIENT_LOGIN'; 

-- Define out columns and save the results to S3
data_out = FOREACH data_filtered GENERATE app_id, payload, event_timestamp;
STORE data_out INTO 's3://epic/hive/dmtolpeko.db/mcp_client_login/';   

From the log you can see that the job used 231 mappers:

INFO mapred.FileInputFormat: Total input paths to process : 250
INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil  - Total input paths (combined) to process : 231
INFO util.MapRedUtil: Total input paths (combined) to process : 231
INFO mapreduce.JobSubmitter: number of splits:231

And we can see that the output contains 231 data files (_SUCCESS is an empty file):

aws s3 ls s3://epic/hive/dmtolpeko.db/mcp_client_login/ --summarize

2018-03-01 22:22:33          0 _SUCCESS
2018-03-01 22:22:30      11165 part-m-00000.gz
2018-03-01 22:22:29      11107 part-m-00001.gz
2018-03-01 22:22:30      11346 part-m-00002.gz
...
2018-03-01 22:22:27       5697 part-m-00228.gz
2018-03-01 22:22:26       5686 part-m-00229.gz
2018-03-01 22:22:28       5480 part-m-00230.gz

Total Objects: 232
   Total Size: 1396533

This job produced a large number of very small files. What can we do to reduce the number of output files?
Continue reading

Access Amazon S3 from Spark

The first step is to specify AWS Hadoop libraries when launching PySpark:

./bin/pyspark --packages org.apache.hadoop:hadoop-aws:2.7.1

Then before you can access objects on Amazon S3, you have to specify your access keys:

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "<key>")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey","<key>")

Now let’s open a file and calculate the number of rows in it:

f=sc.textFile("s3n://epic/dmtolpeko/fs_sum.txt")
f.count()

For my sample file the result as follows:

199