Often a correlated subquery is used in traditional SQL databases to calculate the value of a resulting column using a complex expression that not always possible to achieve using the join operator.
For example,
SELECT val, CASE WHEN val BETWEEN 0 AND 7 THEN (SELECT score FROM scores WHERE score = val) WHEN val < 0 THEN 1 - (SELECT score FROM scores WHERE score * -1.00 = val) ELSE 0 END AS score FROM vals
You can see that the resulting SCORE column is calculated using CASE expression with 2 different subqueries to SCORES table.
Let’s assume that SCORES is a small look-up table, so we can rewrite this query with a Hive UDF as follows:
SELECT val, fn_scores(val) AS score FROM vals
Our Hive UDF will read data from a text file, build a hash map and return the required value. Its code:
import java.util.HashMap; import java.io.BufferedReader; import java.io.FileReader; import org.apache.hadoop.hive.ql.exec.UDF; public class Scores extends UDF { private HashMap<Double, Double> map; // Actual UDF code public Double evaluate(Double score) { if (score == null) { return 0d; } else if (score >= 0 && score <= 7) { return get(score); } else if (score < 0) { return 1 - get(zscore * -1.00); } return 0d; } private Double get(Double score) { if (map == null) { init(); } return map.get(score); } // Creating in-memory hash table for look-up (once per map task) private void init() { map = new HashMap<Double, Double>(); try { BufferedReader file = new BufferedReader(new FileReader("score.txt")); String line = ""; while ((line = file.readLine()) != null) { String parts[] = line.split("\t"); map.put(Double.valueOf(parts[0]), Double.valueOf(parts[1])); } file.close(); } catch (Exception e) {} // Add proper error handling here } }
This code assumes that SCORES table data is exported to score.txt file (TAB-delimited).
The next step is to compile and create JAR file:
javac -classpath .:/usr/lib/hive/lib/hive-exec.jar Scores.java jar cf Scores.jar Scores.class
Before we can use our UDF we have to run the following statements in Hive:
-- Add JAR and TXT files to distributed cache so they are available on each cluster node ADD JAR Scores.jar; ADD FILE scores.txt; -- Register function CREATE TEMPORARY FUNCTION fn_scores AS 'Scores';
Now when we use this UDF in Hive queries, the score.txt file is available at each node (some sort of replicated/broadcast join), loaded into in-memory hash table by UDF and used to look-up data. The initialization is performed once per map task.