Map-side Aggregation in Hive

In MapReduce, you usually use map tasks to filter out records and reduce tasks to perform aggregations. But when you need to perform an aggregation of a large number of rows then using reducers can lead to a very high network I/O. Fortunately, Hive is capable of doing map-side aggregations whenever possible.

Consider a query to calculate the total number of orders per store and product:

SELECT store, product, count(*) AS cnt 
FROM orders
GROUP BY store, product;

Without map-side aggregation, all rows have to be sent to reducers causing high network I/O. Hive has option that is set to true, by default, to specify whether to use map-side aggregation in GROUP BY.

Here is some statistics of the query execution for a sample orders table:


The query does not contain a WHERE clause, and you can see that while the input number of rows is 1,075,477,972 (Map input records), the number of map output rows is just 7,370,737. That’s quite a good result as map tasks were able to reduce the number of rows from 1 billion to 7.3 million.

Only 7,370,737 rows have to be sent to reducers that finally reduced them to 1,657,744 rows.

To perform map-side aggregation, Hive uses a in-memory hash table to hold aggregate values. Besides, Hive offers the following options to configure map-side aggregation:

  • (Default: 0.9) – When the size of the hash table exceeds threshold, it is flushed. This means that for some keys a map task can produce multiple local counts.
  • (Default: 0.5) – Percent of total map task memory that can be used for hash table.
  • (Default: 0.5) – Ratio between hash table and input size to turn map-side aggregation off. If input keys are unique (or highly selective) within each map task there is no value in map-side aggregation.
  • hive.groupby.mapaggr.checkinterval (Default: 100,000) – After this number of rows Hive checks the number of items in the hash table.

Leave a Reply