Parallelizing bzip2 file reads

Index, feed.

[ Date | 2021-03-31 21:37 -0400 ]
[ Mod. | 2021-05-14 15:25 -0400 ]

Problem statement

In some "Big Data" circles, trying to do parallel computation on single, large files is a known cause of bottlenecks: unless the job of decompression can be naturally split over several computation units, decompression has to happen without parallelism.

On clusters that may have hundreds of computation units, it is wasteful to let the cluster mostly idle as only N units are able to concurrently read N input files, when N is much smaller than the number of available computation units.

File formats designed for parallelism allow extensive concurrency: it is possible to quickly find indices to the start of each record, or to small groups of records, and to spread the actual job of deserializing over as many workers as needed.

For various possible reasons including interoperability, however, it happens that input is simply text-based formats (perhaps JSON, perhaps TSV), that use standard block-compression algorithms: gzip, bzip2, zx. Sadly, any of those but bzip2 will be impossible to read in parallel using Apache Hadoop.

How does Hadoop "split" bzip2 to allow parallel decompression?


Does the bzip2 file format have a neat index of the starting offset of all blocks? Or, instead, does each block begin with a size marker, allowing blocks to be skipped efficiently?

Given that bzip2 is indeed "splittable" in Hadoop, and that this checks out experimentally (switching a large, single input from gzip to bzip increases concurrency from 1 to as many computation units as available), before even looking into internals, I would have expected the on-disk format to be something like the following where block-length would allow skipping:

symbol := expression
compressed-file := file-header block*
file-header := misc
block := block-header block-payload
block-header := block-length misc
block-length := some encoding for one integer
block-payload := bytes

Unfortunately, it looks like bzip2 block headers do not include a size marker, but rather that parallel decompressors rely on searching for 48-bit magic strings that mark block boundaries (the 6-byte BCD encodings of π\pi and π\sqrt\pi, incidentally); the actual format is1:

block-header := block-magic block-crc randomized orig-ptr

None of the fields within the block header is a length that would allow skipping straight to the next block.

This still allows some amount of "skipping", in the sense that a reader can find block boundaries without decompressing, but this is not true skipping in the sense of not reading bytes that are irrelevant. This requires going through the input linearly, reading most or all bytes, only saving the time that would have been spent decompressing.

Hadoop implementation

The Hadoop reader implementation2 has a skipToNextMarker method, which looks for a bit pattern within a stream: it is used to split the bzip2 stream into blocks, by searching for the block start marker:

public class CBZip2InputStream extends InputStream implements BZip2Constants {

  public static final long BLOCK_DELIMITER = 0X314159265359L; // start of block
  // ...
  private static final int DELIMITER_BIT_LENGTH = 48;

As far as I can tell, the process is very straightforward, and starts by reading DELIMITER_BIT_LENGTH bits from the stream, testing those for equality with BLOCK_DELIMITER, and reading additional bits as needed until the pattern is matched or the end of the buffer is reached.


  1. Adapted from Joe Tsai Bzip2: Format Specification.

  2. As of 2021-03-31.

Quick links:

Camp info 2007
Camp Faécum 2007
Japanese adjectives
Couleurs LTP
French English words
Petites arnaques
DSC-W17 patch
Scarab: dictionnaire de Scrabble
Omelette soufflée au sirop d'érable
Camembert fondu au sirop d'érable
La Mona de Tata Zineb
Cake aux bananes, au beurre de cacahuètes et aux pépites de chocolat