hflush – HDFS for Near Real-Time Applications

HDFS is a fault-tolerant and distributed file system, but how can you pass data from a producer to a consumer of information in near real-time?

Let’s create a simple Java program that writes “Hello, world!” string to a file in HDFS and check what is required to allow readers to see the data immediately:

import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;

public class Write {
  public static void main (String [] args) throws Exception {    
    Path path = new Path(args[0]);
    FileSystem fs = FileSystem.get(new Configuration());
            
    FSDataOutputStream out = fs.create(path, true /*overwrite*/);
            
    System.out.println("File created, check if metadata exists at NameNode.");
    System.in.read();  // Press Enter to continue 
            
    out.writeChars("Hello, world!");
            
    System.out.println("Data written, check if you can see them in another session.");
    System.in.read();  // Press Enter to continue
                        
    out.hflush();            
            
    System.out.println("Buffer flushed, check if you can see data now.");
    System.in.read();  // Press Enter to continue
                        
    out.close();
  }
}

You can compile the code using the following command (set your path to Hadoop jar version):

javac -classpath /usr/lib/hadoop/hadoop-common-2.4.0.2.1.5.0-695.jar Write.java

This sample program uses the default Hadoop configuration properties, and requires you to specify a HDFS file name as the input parameter. You can run it as follows:

hadoop Write /user/v-dtolpeko/abc.txt 

The program executes a step and then waits until you press Enter to continue and execute the next step. How let’s check what happens in HDFS during each operation.

Visibility of File Creation in HDFS

After creating the file the program waits for an user input. You can open the second session and see that the newly created empty file is already visible to other sessions:

[v-dtolpeko ~]$ hadoop fs -ls /user/v-dtolpeko/abc.txt
-rw-rw-r-- 3 v-dtolpeko hdp_ewwdev  0 2014-11-11 00:59 /user/v-dtolpeko/abc.txt

Now press Enter, so the application will write data to the file.

Not Flushed Data is Not Visible Yet

The application has just written “Hello, world!” string to the file, but you still cannot see it in other sessions:

[v-dtolpeko ~]$ hadoop fs -cat /user/v-dtolpeko/abc.txt
[v-dtolpeko ~]$

Flushed Data Become Visible

When the application executes hflush(), data is sent out of the client buffer to all data nodes holding the replica for this block. Now any other application can see the data:

[v-dtolpeko ~]$ hadoop fs -cat /user/v-dtolpeko/abc.txt
Hello, world!
[v-dtolpeko ~]$

Note that when a HDFS block is fully filled it is automatically flushed.

So you can use HDFS hflush() to allow consumers to immediately read the new portion of data written to the file, and you can use this feature to build near-real time applications that use HDFS as the storage.

SELECT COUNT(*) on SequenceFiles and Text

What happens when you run SELECT COUNT(*) statement for a table stored in the SequenceFile or text file format in Hive? How many resources does this operation require?

SELECT COUNT(*) FROM orders;

Let’s run the query on a table containing 1 billion rows (1,075,477,972 rows) to investigate details. There were 242 mappers and 1 reducer launched in my example, and the job result is as follows:

Job 0: Map: 242  Reduce: 1   Cumulative CPU: 2763.69 sec   HDFS Read: 80211919808 HDFS Write: 11 SUCCESS
Total MapReduce CPU Time Spent: 46 minutes 3 seconds 690 msec
OK
1075477972
Time taken: 70.251 seconds, Fetched: 1 row(s)

First, you can see that mappers had to read all table data from HDFS (80 GB), and only the final count was written to HDFS (11 bytes):

selcount_seq_20141013

The good news is that each mapper calculated its own local row count, so the reducer just had to sum 242 counts to get the final row count:

selcount_seq_20141013_2

So when calculating the number of rows in a table stored in SequenceFile or text format you should remember that map tasks will read all table rows from HDFS and this operation can be quite expensive depending on the table size.

At the same time the reducer does not need to process all rows, it just receives local counts from each map, so network I/O between mappers and the reducer is very low, and the reduce operation is not expensive no matter how many rows are in the table.

Note that if your query includes a WHERE condition, it still requires the same I/O resources to execute the statement:

SELECT COUNT(*) FROM orders WHERE code = 'A1';

Mappers still need to read all data from HDFS, and code = 'A1' condition just affects on their local rows counts (and additional time to compare values). The reducer still needs to read local row counts from 242 mappers and summarize them.

ORCFile Table Scan Performance

Let’s evaluate the performance of table scan operation using various file formats available in Hive. I have a table containing 32M rows and I need to execute a simple query that returns 43 rows only:

SELECT * FROM channels WHERE code = 'NOT APPLICABLE';

The size of the table when stored in different formats with different compression codecs is as follows:

Format Compression Size
Text Uncompressed 13.1 GB
SequenceFile Uncompressed 12.3 GB
SequenceFile Snappy 1.3 GB
ORCFile Snappy 561 MB
ORCFile ZLIB 341 MB

You can see how compression and especially ORCFile file format allow you to reduce the storage size. Now let’s run the query and see its performance.

SequenceFile Snappy

First let’s query the table containing data in SequenceFile format with Snappy compression:

SELECT * FROM channels WHERE code = 'NOT APPLICABLE';

Data in HDFS:

orcfile_20141010

9 map tasks were executed that completed within 15-30 seconds:

orcfile_20141010_1

Looking at the log of the fastest mapper, we can see that it read and processed 3M rows:

orcfile_20141010_2

The job read 1.2 GB of data from HDFS that equals to the full SequenceFile size:

orcfile_20141010_3

ORCFile ZLIB

Now let’s query the table containing data in ORFile format with ZLIB compression:

SELECT * FROM channels WHERE code = 'NOT APPLICABLE';

Data in HDFS:

orcfile_20141010_4

1 map task was launched that completed in 12 seconds:

orcfile_20141010_5

The mapper log shows that it processed 1000 rows only:

orcfile_20141010_6

Note that although the table size is 341 MB only 86 MB were read from HDFS:

orcfile_20141010_7

ORCFile format not only allows reducing the storage size, but also helps avoid full table scan. Its inline indexes help define which blocks of data to read and which to skip. As you can see ORCFile can dramatically reduce resources required to perform table scan.