Map-side Aggregation in Hive

In MapReduce, you usually use map tasks to filter out records and reduce tasks to perform aggregations. But when you need to perform an aggregation of a large number of rows then using reducers can lead to a very high network I/O. Fortunately, Hive is capable of doing map-side aggregations whenever possible.

Consider a query to calculate the total number of orders per store and product:

SELECT store, product, count(*) AS cnt 
FROM orders
GROUP BY store, product;

Without map-side aggregation, all rows have to be sent to reducers causing high network I/O. Hive has hive.map.aggr option that is set to true, by default, to specify whether to use map-side aggregation in GROUP BY.

Here is some statistics of the query execution for a sample orders table:

mapagg_20141013

The query does not contain a WHERE clause, and you can see that while the input number of rows is 1,075,477,972 (Map input records), the number of map output rows is just 7,370,737. That’s quite a good result as map tasks were able to reduce the number of rows from 1 billion to 7.3 million.

Only 7,370,737 rows have to be sent to reducers that finally reduced them to 1,657,744 rows.

To perform map-side aggregation, Hive uses a in-memory hash table to hold aggregate values. Besides hive.map.aggr, Hive offers the following options to configure map-side aggregation:

  • hive.map.aggr.hash.force.flush.memory.threshold (Default: 0.9) – When the size of the hash table exceeds threshold, it is flushed. This means that for some keys a map task can produce multiple local counts.
  • hive.map.aggr.hash.percentmemory (Default: 0.5) – Percent of total map task memory that can be used for hash table.
  • hive.map.aggr.hash.min.reduction (Default: 0.5) – Ratio between hash table and input size to turn map-side aggregation off. If input keys are unique (or highly selective) within each map task there is no value in map-side aggregation.
  • hive.groupby.mapaggr.checkinterval (Default: 100,000) – After this number of rows Hive checks the number of items in the hash table.

SELECT COUNT(*) on SequenceFiles and Text

What happens when you run SELECT COUNT(*) statement for a table stored in the SequenceFile or text file format in Hive? How many resources does this operation require?

SELECT COUNT(*) FROM orders;

Let’s run the query on a table containing 1 billion rows (1,075,477,972 rows) to investigate details. There were 242 mappers and 1 reducer launched in my example, and the job result is as follows:

Job 0: Map: 242  Reduce: 1   Cumulative CPU: 2763.69 sec   HDFS Read: 80211919808 HDFS Write: 11 SUCCESS
Total MapReduce CPU Time Spent: 46 minutes 3 seconds 690 msec
OK
1075477972
Time taken: 70.251 seconds, Fetched: 1 row(s)

First, you can see that mappers had to read all table data from HDFS (80 GB), and only the final count was written to HDFS (11 bytes):

selcount_seq_20141013

The good news is that each mapper calculated its own local row count, so the reducer just had to sum 242 counts to get the final row count:

selcount_seq_20141013_2

So when calculating the number of rows in a table stored in SequenceFile or text format you should remember that map tasks will read all table rows from HDFS and this operation can be quite expensive depending on the table size.

At the same time the reducer does not need to process all rows, it just receives local counts from each map, so network I/O between mappers and the reducer is very low, and the reduce operation is not expensive no matter how many rows are in the table.

Note that if your query includes a WHERE condition, it still requires the same I/O resources to execute the statement:

SELECT COUNT(*) FROM orders WHERE code = 'A1';

Mappers still need to read all data from HDFS, and code = 'A1' condition just affects on their local rows counts (and additional time to compare values). The reducer still needs to read local row counts from 242 mappers and summarize them.

ORCFile Table Scan Performance

Let’s evaluate the performance of table scan operation using various file formats available in Hive. I have a table containing 32M rows and I need to execute a simple query that returns 43 rows only:

SELECT * FROM channels WHERE code = 'NOT APPLICABLE';

The size of the table when stored in different formats with different compression codecs is as follows:

Format Compression Size
Text Uncompressed 13.1 GB
SequenceFile Uncompressed 12.3 GB
SequenceFile Snappy 1.3 GB
ORCFile Snappy 561 MB
ORCFile ZLIB 341 MB

You can see how compression and especially ORCFile file format allow you to reduce the storage size. Now let’s run the query and see its performance.

SequenceFile Snappy

First let’s query the table containing data in SequenceFile format with Snappy compression:

SELECT * FROM channels WHERE code = 'NOT APPLICABLE';

Data in HDFS:

orcfile_20141010

9 map tasks were executed that completed within 15-30 seconds:

orcfile_20141010_1

Looking at the log of the fastest mapper, we can see that it read and processed 3M rows:

orcfile_20141010_2

The job read 1.2 GB of data from HDFS that equals to the full SequenceFile size:

orcfile_20141010_3

ORCFile ZLIB

Now let’s query the table containing data in ORFile format with ZLIB compression:

SELECT * FROM channels WHERE code = 'NOT APPLICABLE';

Data in HDFS:

orcfile_20141010_4

1 map task was launched that completed in 12 seconds:

orcfile_20141010_5

The mapper log shows that it processed 1000 rows only:

orcfile_20141010_6

Note that although the table size is 341 MB only 86 MB were read from HDFS:

orcfile_20141010_7

ORCFile format not only allows reducing the storage size, but also helps avoid full table scan. Its inline indexes help define which blocks of data to read and which to skip. As you can see ORCFile can dramatically reduce resources required to perform table scan.