Using Hive UDF to Perform Correlated Subquery

Often a correlated subquery is used in traditional SQL databases to calculate the value of a resulting column using a complex expression that not always possible to achieve using the join operator.

For example,

SELECT val,
  CASE 
    WHEN val BETWEEN 0 AND 7 THEN (SELECT score FROM scores WHERE score = val)
    WHEN val < 0 THEN 1 - (SELECT score FROM scores WHERE score * -1.00 = val)
    ELSE 0
  END AS score  
FROM vals

You can see that the resulting SCORE column is calculated using CASE expression with 2 different subqueries to SCORES table.

Let’s assume that SCORES is a small look-up table, so we can rewrite this query with a Hive UDF as follows:

SELECT val, fn_scores(val) AS score
FROM vals

Our Hive UDF will read data from a text file, build a hash map and return the required value. Its code:

import java.util.HashMap;
import java.io.BufferedReader;
import java.io.FileReader;
import org.apache.hadoop.hive.ql.exec.UDF;

public class Scores extends UDF {
  private HashMap<Double, Double> map;

  // Actual UDF code
  public Double evaluate(Double score) {
    if (score == null) {
      return 0d;
    }
    else if (score >= 0 && score <= 7) { 
      return get(score);     
    } 
    else if (score < 0) {
      return 1 - get(zscore * -1.00); 
    }
    return 0d;
  }

  private Double get(Double score) {
    if (map == null) {
       init();
    }
    return map.get(score);    
  }
 
  // Creating in-memory hash table for look-up (once per map task)
  private void init() {
    map = new HashMap<Double, Double>();
    try {
      BufferedReader file = new BufferedReader(new FileReader("score.txt"));
      String line = "";
      while ((line = file.readLine()) != null) {
        String parts[] = line.split("\t");
        map.put(Double.valueOf(parts[0]), Double.valueOf(parts[1]));
      }
      file.close();
    }
    catch (Exception e) {}   // Add proper error handling here
  }
}

This code assumes that SCORES table data is exported to score.txt file (TAB-delimited).

The next step is to compile and create JAR file:

javac -classpath .:/usr/lib/hive/lib/hive-exec.jar Scores.java
jar cf Scores.jar Scores.class

Before we can use our UDF we have to run the following statements in Hive:

-- Add JAR and TXT files to distributed cache so they are available on each cluster node
ADD JAR Scores.jar;
ADD FILE scores.txt;

-- Register function
CREATE TEMPORARY FUNCTION fn_scores AS 'Scores';

Now when we use this UDF in Hive queries, the score.txt file is available at each node (some sort of replicated/broadcast join), loaded into in-memory hash table by UDF and used to look-up data. The initialization is performed once per map task.