Monday, February 14, 2011

High-Level Hadoop

At a high level, map reduce converts an input record to one or more output records, and then combines those output records into the final output records. The way that it does this can seem like magic, since so much of it is handled by the framework. Hopefully, this will demystify some of what's going on.

You start by configuring the job with a Configuration. The configuration tells Hadoop things like the location of your input, where to put your output, what class implements your mapper and reducer, and how the input and output are formatted.

The input formatter and its record reader take the input file (or files) and split them into records that are passed to the map class.1

For each record that the record reader produces, map gets called once to produce an output record. Those records get stored in memory for awhile, and then written to disk when memory runs low or the map-phase is finished.2

Once the mappers are done, their output gets sorted by its key, so that the records group together by key. Then those records are split up by key and sent off to reducers (all of the values for any given key will go to the same reducer, though).

The reducer then gets called with a key and all of the values for that key (as an Iterator). The reducer then does its thing with the values and writes them to its Context, which will send them to the output formatter, which deals with persisting the results.

Footnotes:
1. The files in HDFS are split into chunks and replicated across the cluster. For performance, Hadoop will try to run the mappers on the same machines that house the blocks of the file. Unless you're writing your own input format, this will be transparent to you.

2. You can also add a combiner, which does the same thing as the reducer and, in fact, can be the same class as your reducer. Adding one can potentially improve performance dramatically by drastically reducing the amount of data that has to be sorted and distributed to reducers

Sunday, February 6, 2011

Know When to Fold 'Em

The Map & Reduce approach to dealing with large data sets, as implemented in Hadoop, has its background in functional programming, where both are used to manipulate the elements of a collection. Specifically, map is used to transform one element from the collection to some other type, for example to turn a string representation of an integer into an actual integer. Reduce, on the other hand, is used to combine all of the elements of the collection into a single result (which may, in turn, be another collection).

With just these two “primitives”, you can do a lot of powerful number crunching (or string manipulation, as the case may be), but what if you've got a lot of data coming in constantly and you need to be able to answer questions quickly? (Hadoop tends to have the throughput of a delivery truck full of hard drives and the latency of a delivery truck full of hard drives)

First, let's tackle the delivering-results-quickly part of the problem. Traditional RDBMS' handle this by building indices on the data for the fields that are most frequently searched. Fortunately, there's absolutely nothing that says that we can't do that with Map-Reduce. By building indices with map-reduce, we can then answer questions (that we know about ahead of time) about the data much more quickly. We can also calculate things like sums and averages on the data, so that answering analytics questions is just a simple value lookup.

Now, on to the constant influx of data problem. In functional programming, reduce has a “sister” function called fold that does the same thing as reduce except it takes a starting value to reduce (or fold) into. With fold, you can take the results that you calculated earlier and incorporate them into the new data (or vice versa). So, right now you may be thinking, “Well hello Mr. Fancy Pants. I've got news for you pal, Hadoop doesn't support fold, just reduce!” This is where the relationship between fold and reduce comes in (warning, LISP-like pseudocode follows); (fold startingValue list) has the same effect as (reduce (cons startingValue list)). We can take advantage of that equality by having a mapper that generates the previous run's results as input to a reducer, which combines it with the new data's results.