In MapReduce, you usually use map tasks to filter out records and reduce tasks to perform aggregations. But when you need to perform an aggregation of a large number of rows then using reducers can lead to a very high network I/O. Fortunately, Hive is capable of doing map-side aggregations whenever possible.
Consider a query to calculate the total number of orders per store and product:
SELECT store, product, count(*) AS cnt FROM orders GROUP BY store, product;
Without map-side aggregation, all rows have to be sent to reducers causing high network I/O. Hive has
hive.map.aggr option that is set to
true, by default, to specify whether to use map-side aggregation in GROUP BY.
Here is some statistics of the query execution for a sample
The query does not contain a WHERE clause, and you can see that while the input number of rows is 1,075,477,972 (Map input records), the number of map output rows is just 7,370,737. That’s quite a good result as map tasks were able to reduce the number of rows from 1 billion to 7.3 million.
Only 7,370,737 rows have to be sent to reducers that finally reduced them to 1,657,744 rows.
To perform map-side aggregation, Hive uses a in-memory hash table to hold aggregate values. Besides
hive.map.aggr, Hive offers the following options to configure map-side aggregation:
hive.map.aggr.hash.force.flush.memory.threshold(Default: 0.9) – When the size of the hash table exceeds threshold, it is flushed. This means that for some keys a map task can produce multiple local counts.
hive.map.aggr.hash.percentmemory(Default: 0.5) – Percent of total map task memory that can be used for hash table.
hive.map.aggr.hash.min.reduction(Default: 0.5) – Ratio between hash table and input size to turn map-side aggregation off. If input keys are unique (or highly selective) within each map task there is no value in map-side aggregation.
hive.groupby.mapaggr.checkinterval(Default: 100,000) – After this number of rows Hive checks the number of items in the hash table.