Invoking Stateful UDF at Map and Reduce Side in Hive

Typically a scalar UDF processes one or more columns from the current row only. For example, UPPER(col) function does not need to store the result of the previous call or access previous rows.

How to write and correctly call a function that needs to store its state? For example, let’s write a ROWNUM function that assigns sequential numbers for all rows in the table, no matter how many mappers were launched to read the data.

The simplest possible UDF:

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFType;

@UDFType(deterministic = false)
public class Rownum2 extends UDF {
  private Long rowNum = 0L;

  public Long evaluate() {
    rowNum++;
    return rowNum;
  }
}

Note that it is important to specify @UDFType(deterministic = false) otherwise the function called only once. Compile the program and create jar:

javac -classpath .:/usr/lib/hive/lib/hive-exec.jar Rownum2.java
jar cf Rownum2.jar Rownum2.class

Now launch Hive and create the function:

ADD JAR /home/v-dtolpeko/Rownum2.jar;
CREATE TEMPORARY FUNCTION rownum2 AS 'Rownum2';

The ROWNUM2 function is available until the end of the current session.

Map-Side UDF Execution

Let’s now run the following query on a sample table having over 30M rows:

CREATE TABLE channel2 AS
  SELECT name, code, rownum2() AS rn
  FROM channel;

It is important to notice how many mappers were launched for the query:

Hadoop job information for Stage-1: number of mappers: 7; number of reducers: 0

Now you can define how many rows are in the table, what is the maximum and minimum values in RN column:

SELECT COUNT(*), MIN(rn), MAX(rn) FROM channel;
32093903  1        6873300

You can see that the assigned maximum sequential number is not equal to the total number of rows in the table. Investigating further:

SELECT rn, COUNT(*) FROM channel GROUP BY rn LIMIT 10;
1       7
2       7
3       7
4       7
5       7
6       7
7       7
8       7
9       7
10      7

You can see that the number of times the same row number was assigned is equal to the number of mappers i.e ROWNUM function was executed at the map side. EXPLAIN statement confirms this:

EXPLAIN
  SELECT name, code, rownum2() AS rn
  FROM channel;
Stage: Stage-1
  Map Reduce
    Map Operator Tree:
      TableScan
        alias: channel
        Select Operator
          expressions: name (type: string), code (type: string), rownum2() (type: bigint)

Reduce-Side UDF Execution

So how to assign unique row numbers for the entire table? Let’s see how the following query works:

CREATE TABLE channel2 AS
  SELECT name, code, rownum2() AS rn
  FROM
    (SELECT name, code
     FROM channel
     DISTRIBUTE BY 1
     SORT BY 1
    ) t;

I used constant 1 in DISTRIBUTE BY and SORT BY since there are no partitions in the table, and I want to assign unique row numbers for the entire set.

The query runs much longer as all rows are sent to the single reducer now. Now collect some statistics:

SELECT COUNT(*), MIN(rn), MAX(rn) FROM channel;
32093903        1       32093903

Now the assigned maximum sequential number is equal to the total number of rows in the table. EXPLAIN statement output is as follows:

EXPLAIN
  SELECT name, code, rownum2() AS rn
  FROM
    (SELECT name, code
     FROM channel
     DISTRIBUTE BY 1
     SORT BY 1
    ) t;
Stage: Stage-1
  Map Reduce
    Map Operator Tree:
      TableScan
        alias: channel
          Select Operator
            expressions: name (type: string), code (type: string), 
              Reduce Output Operator
                key expressions: 1 (type: int)
                sort order: +
                Map-reduce partition columns: 1 (type: int)
    Reduce Operator Tree:
      Extract
        Select Operator
          expressions: _col0 (type: string), _col1 (type: string), rownum2() (type: bigint)

ROWNUM function is now called at the Reduce side that ensures row numbers are unique within the entire table (since we have only one reducer for the query). It is important to note that Hive still uses a single MapReduce job and just moved the function call to Reduce side, it did not run one MapReduce for subquery and another for the outer query.

Leave a Reply