Using Hive UDF to Perform Correlated Subquery

Often a correlated subquery is used in traditional SQL databases to calculate the value of a resulting column using a complex expression that not always possible to achieve using the join operator.

For example,

    WHEN val BETWEEN 0 AND 7 THEN (SELECT score FROM scores WHERE score = val)
    WHEN val < 0 THEN 1 - (SELECT score FROM scores WHERE score * -1.00 = val)
    ELSE 0
  END AS score  
FROM vals

You can see that the resulting SCORE column is calculated using CASE expression with 2 different subqueries to SCORES table.

Let’s assume that SCORES is a small look-up table, so we can rewrite this query with a Hive UDF as follows:

SELECT val, fn_scores(val) AS score
FROM vals

Our Hive UDF will read data from a text file, build a hash map and return the required value. Its code:

import java.util.HashMap;
import org.apache.hadoop.hive.ql.exec.UDF;

public class Scores extends UDF {
  private HashMap<Double, Double> map;

  // Actual UDF code
  public Double evaluate(Double score) {
    if (score == null) {
      return 0d;
    else if (score >= 0 && score <= 7) { 
      return get(score);     
    else if (score < 0) {
      return 1 - get(zscore * -1.00); 
    return 0d;

  private Double get(Double score) {
    if (map == null) {
    return map.get(score);    
  // Creating in-memory hash table for look-up (once per map task)
  private void init() {
    map = new HashMap<Double, Double>();
    try {
      BufferedReader file = new BufferedReader(new FileReader("score.txt"));
      String line = "";
      while ((line = file.readLine()) != null) {
        String parts[] = line.split("\t");
        map.put(Double.valueOf(parts[0]), Double.valueOf(parts[1]));
    catch (Exception e) {}   // Add proper error handling here

This code assumes that SCORES table data is exported to score.txt file (TAB-delimited).

The next step is to compile and create JAR file:

javac -classpath .:/usr/lib/hive/lib/hive-exec.jar
jar cf Scores.jar Scores.class

Before we can use our UDF we have to run the following statements in Hive:

-- Add JAR and TXT files to distributed cache so they are available on each cluster node
ADD JAR Scores.jar;
ADD FILE scores.txt;

-- Register function

Now when we use this UDF in Hive queries, the score.txt file is available at each node (some sort of replicated/broadcast join), loaded into in-memory hash table by UDF and used to look-up data. The initialization is performed once per map task.