Invoking Stateful UDF at Map and Reduce Side in Hive

Typically a scalar UDF processes one or more columns from the current row only. For example, UPPER(col) function does not need to store the result of the previous call or access previous rows.

How to write and correctly call a function that needs to store its state? For example, let’s write a ROWNUM function that assigns sequential numbers for all rows in the table, no matter how many mappers were launched to read the data.

The simplest possible UDF:

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;

@UDFType(deterministic = false)
public class Rownum2 extends UDF {
  private Long rowNum = 0L;

  public Long evaluate() {
    rowNum++;
    return rowNum;
  }
}

Note that it is important to specify @UDFType(deterministic = false) otherwise the function called only once. Compile the program and create jar:

javac -classpath .:/usr/lib/hive/lib/hive-exec.jar Rownum2.java
jar cf Rownum2.jar Rownum2.class

Now launch Hive and create the function:

ADD JAR /home/v-dtolpeko/Rownum2.jar;
CREATE TEMPORARY FUNCTION rownum2 AS 'Rownum2';

The ROWNUM2 function is available until the end of the current session.

Map-Side UDF Execution

Let’s now run the following query on a sample table having over 30M rows:

CREATE TABLE channel2 AS
  SELECT name, code, rownum2() AS rn
  FROM channel;

It is important to notice how many mappers were launched for the query:

Hadoop job information for Stage-1: number of mappers: 7; number of reducers: 0

Now you can define how many rows are in the table, what is the maximum and minimum values in RN column:

SELECT COUNT(*), MIN(rn), MAX(rn) FROM channel;
32093903  1        6873300

You can see that the assigned maximum sequential number is not equal to the total number of rows in the table. Investigating further:

SELECT rn, COUNT(*) FROM channel GROUP BY rn LIMIT 10;
1       7
2       7
3       7
4       7
5       7
6       7
7       7
8       7
9       7
10      7

You can see that the number of times the same row number was assigned is equal to the number of mappers i.e ROWNUM function was executed at the map side. EXPLAIN statement confirms this:

EXPLAIN
  SELECT name, code, rownum2() AS rn
  FROM channel;
Stage: Stage-1
  Map Reduce
    Map Operator Tree:
      TableScan
        alias: channel
        Select Operator
          expressions: name (type: string), code (type: string), rownum2() (type: bigint)

Reduce-Side UDF Execution

So how to assign unique row numbers for the entire table? Let’s see how the following query works:

CREATE TABLE channel2 AS
  SELECT name, code, rownum2() AS rn
  FROM
    (SELECT name, code
     FROM channel
     DISTRIBUTE BY 1
     SORT BY 1
    ) t;

I used constant 1 in DISTRIBUTE BY and SORT BY since there are no partitions in the table, and I want to assign unique row numbers for the entire set.

The query runs much longer as all rows are sent to the single reducer now. Now collect some statistics:

SELECT COUNT(*), MIN(rn), MAX(rn) FROM channel;
32093903        1       32093903

Now the assigned maximum sequential number is equal to the total number of rows in the table. EXPLAIN statement output is as follows:

EXPLAIN
  SELECT name, code, rownum2() AS rn
  FROM
    (SELECT name, code
     FROM channel
     DISTRIBUTE BY 1
     SORT BY 1
    ) t;
Stage: Stage-1
  Map Reduce
    Map Operator Tree:
      TableScan
        alias: channel
          Select Operator
            expressions: name (type: string), code (type: string), 
              Reduce Output Operator
                key expressions: 1 (type: int)
                sort order: +
                Map-reduce partition columns: 1 (type: int)
    Reduce Operator Tree:
      Extract
        Select Operator
          expressions: _col0 (type: string), _col1 (type: string), rownum2() (type: bigint)

ROWNUM function is now called at the Reduce side that ensures row numbers are unique within the entire table (since we have only one reducer for the query). It is important to note that Hive still uses a single MapReduce job and just moved the function call to Reduce side, it did not run one MapReduce for subquery and another for the outer query.

Map Join Limitations – Out of Memory in Local Task

When Hive performs a map join it firstly starts a local task to read the side table (“small” table in join) from HDFS (direct read without launching MapReduce) and builds a hash table (for more details, see MapJoin Implementation).

Hive creates the hash table in memory and it imposes significant overheard. Additional factor is compression – the table may look quite small, but its size can grow 10x when the table is decompressed.

For example, I have a table that contains 32 million rows and takes 1.3 GB in HDFS (SequenceFile with Snappy compression codec), and I wanted to use this table in a map join. When this table is stored as a text file without compression it takes about 12 GB.

By default, the maximum size of a table to be used in a map join (as the small table) is 1,000,000,000 bytes (about 1 GB), so I have to increase it for my table:

set hive.auto.convert.join.noconditionaltask=true;
set hive.auto.convert.join.noconditionaltask.size=2000000000;

Then when my table is used in a join, Hive uses a map join and launches a local task:

Starting to launch local task to process map join;      maximum memory = 4261937152
Processing rows:   200000  Hashtable size: 199999  Memory usage:  137524984    percentage:  0.032
Processing rows:   300000  Hashtable size: 299999  Memory usage:  183819200    percentage:  0.043
....

You can see that the maximum memory is 4 GB, and percentage shows (0.032 actually means 3.2%) the fraction of the in-memory hash table in the process memory. The hash table can grow until it reaches hive.mapjoin.localtask.max.memory.usage having 0.9 by default.

Eventually, my local task failed:

Processing rows:   7800000 Hashtable size: 7799999  Memory usage:  3726270024   percentage:  0.874
Processing rows:   7900000 Hashtable size: 7899999  Memory usage:  3784069496   percentage:  0.888
Processing rows:   8000000 Hashtable size: 7999999  Memory usage:  3832235792   percentage:  0.899
Processing rows:   8100000 Hashtable size: 8099999  Memory usage:  3890035464   percentage:  0.913
Execution failed with exit status: 3

The local task reached its memory limit 0.913 (91.3% of process memory) and just processed 8.1M of 32M rows. This example shows that map join local task is memory expensive, and 4 GB was used just to load 25% of rows.

You should take this into account especially when tables are stored with compression. The table size maybe not too large but when it is decompressed it can grow 10x and more and Hive will not be able to build in-memory hash table and use it in a map join.