Feeds:
Posts

## BSP with Hadoop (and without)

I have recently started to look at the BSP (Bulk Synchronous Parallel) model of computation for large scale graph analysis.
The model itself is quite old, but it has seen a resurgence lately mainly because of the Pregel system by Google. Here the paper.
The idea of Pregel is to use synchronous barriers to cleanly separate messages in rounds.
In each step, a vertex can read the messages sent to it in the previous step, perform some computation, and generate messages for the next step.
When all the nodes vote to halt the  algorithm and no more messages arrive, the computation stops.

As with MapReduce and Hadoop, a number of clones of Pregel have popped up.
I will briefly review the most interesting ones in this post.

Apache Giraph

Giraph is a Hadoop-based graph processing engine open sourced by Yahoo! and now in the Apache Incubator.
It is at a very early stage, but the community looks very active and diverse. Giraph developers play in the same backyard of Hadoop, Pig and Hive developers, and this makes me feel confident about its future.
Giraph uses Hadoop for scheduling its workers and get the data loaded in memory, but then implements its own communication protocol using HadoopRPC. It also uses Zookeeper for fault tolerance by periodic checkpointing.
Interestingly, Giraph does not require installing anything on the cluster, so you can try it out on your existing Hadoop infrastructure. A Giraph job runs like a normal map-only job. It can actually be thought as a graph-specific Hadoop library.
However, because of this fact, the API currently lack encapsulation. Users need to write a custom  Reader + Writer + InputFormat + OutputFormat for each Vertex program they create. A library to read common graph formats would be a nice addition.

The good:
Very active community with people from the Hadoop ecosystem.

Early stage.

Apache Hama

Hama is a Hadoop-like solution and is the oldest member in the group. Currently it is in the Apache Incubator and it was initially developed by an independent Korean programmer.
Hama focuses on general BSP computations, so it is not only for graphs. For example there are algorithms for matrix inversion and linear algebra (I know, one could argue that a graph and a matrix are actually the same data structure).
Unfortunately, the project seems to be moving slowly even though lately there has been a spike of activity, probably caused mainly by the GSoC (it works!).
Currently it is still at a very early stage:  the current version doesn’t provide a proper I/O API and data partitioner. From my understanding there is no fault-tolerance either.
From the technical point of view, Hama uses Zookeeper for  coordination and HDFS for data persistence. It is designed to be “The Hadoop of BSP”.
Given its focus on general BSP, the kind of primitives that it provides are at a low level of abstraction, very much like a restricted version of MPI.

The good:
General BSP.
Complete system.
The logo is cute.

Early stage.
Graph processing library not yet released.

GoldenOrb

GoldenOrb is a Hadoop based Pregel clone open sourced by Ravel. It should be in the process of getting into the Apache Incubator.
It is a close clone of Pregel and very similar to Giraph: a system to run distributed iterative algorithms on graphs.
The components of the system like vertex values (state), edges and messages are built on top of the Hadoop Writables system.
One thing I don’t understand is why they didn’t leverage Java generics. To get the value of a message you need to do ugly explicit casts:

@Override public void compute(Collection<IntMessage> messages) {
int _maxValue = 0;
for(IntMessage m: messages) {
int msgValue = ((IntWritable)m.getMessageValue()).get();
_maxValue = Math.max(_maxValue, msgValue);
}
}

As with Giraph, Hadoop details leak into the API. However, GoldenOrb requires additional infrastructure to run. An OrbTracker needs to be running on each Hadoop node. It Also uses HadoopRPC for communication. The administrator can define the number of partitions to assign to each OrbTracker and the threads per node to launch can be configured on a per-job basis.

The good:
Commercial support.

Early stage, not yet in Incubator.
API has rough edges.

JPregel

A final mention goes to JPregel. It is a Java clone of Pregel which is not Hadoop based.
The programming interface is thought from the ground up and is very clean.
Right now it is at a very very early stage of development, e.g. messages can only be doubles and the system itself is not yet finalized.
Interestingly, no explicit halting primitive is present. From what I understood it automatically deduces the termination by the absence of new messages.
It is a very nice piece of work, especially considering the fact that it has been done by only a couple of first year master students.

PS: I waited too much before publishing this post and I was beaten on time by one of the Giraph committers, even though he reviews also some other piece of software. Have a look at his post.

## \begin{thesis}

Piled Higher and Deeper.

Crossing fingers!

## Grokking data

• When you have been exploring a dataset for a while, studying its distribution, its composition, its quirks, its innards and its “essence” in the end.
• When you know the answer to a query even before performing it.
• When you have a strict hierarchical organization of the folders for plots.
• When you know by heart the number of unique URLs and usernames in the dataset.
• When your methodology to name files according to their schema has become more complex that the schemas themselves.
• When you have five different versions of the dataset, but you forgot the reason behind four of them.
• When the size of the scripts to analyze the dataset begins to rival the dataset itself.
• Ultimately, when the only thought of putting again your hands on that data gives you urticaria.
That’s when you grokked the data.
Now imagine doing that on hundreds of gigabytes…

While speaking about my latest work with some researchers, they confessed me they were very curious about how to implement an iterative algorithm in MapReduce/Hadoop. They made me realize that the task is not exactly straightforward, so I decided to write a post about it, and here we are.

The general idea for iterative algorithms in MapReduce is to chain multiple jobs together, using the output of the last one as the input of the next one. An important consideration is that, given the usual size of the data, the termination condition must be computed within the MapReduce program. The standard MapReduce model does not offer simple elegant ways to do this, but Hadoop has some added features that simplify this task: Counters.

To check for a termination condition with Hadoop counters, you run the job and collect statistics while its running. Then you access the counters and get their values, you might also compute derivate measures from counters and finally decide whether to continue iterating or to stop.

Here an example. The code is only partial, it’s intended just to show the technique. Suppose we want to do some directed graph processing using a diffusion process (the exact application doesn’t matter, I am using a coloring example here). Assume the mapper propagates the information along the graph and the reducer computes the new one.

    public static enum Color {
RED, GREEN, BLUE;
}

public static enum State {
UPDATED;
}

public static class ExampleReducer extends Reducer<Vertex, Edge, Vertex, Edge> {

@Override
protected void reduce(Vertex key, Iterable<Edge> inLinks, Context context) {
int numRed=0, numGreen=0, numBlue=0;
for (Edge e : inLinks) {
switch (e.color()) {
case RED:
numRed++;
break;
case GREEN:
numGreen++;
break;
case BLUE:
numBlue++;
break;
default:
assert false : "Unexpected color " + e;
}
}
context.getCounter(RED).increment(numRed);
context.getCounter(GREEN).increment(numGreen);
context.getCounter(BLUE).increment(numBlue);

Color currentColor = key.color();
Color newColor = computeNewColor(numRed, numGreen, numBlue);
if (currentColor != newColor) {
context.getCounter(UPDATED).increment(1);
key.setColor(newColor);
}
for (Edge e : key.outLinks()) {
e.setColor(newColor);
context.write(key, e);
}
}

In this snippet, we scan the incoming edges for each vertex and compute the number of edges for each color. Then we proceed to compute the new color based on some procedure (omitted for brevity) and write the updated results.
After the scan, we use the counters to report how many edges of each color we have seen. At the end of the job we will have a global view of the colors in the graph.
If we update the vertex with a new color, we report it using another counter (UPDATED). At the end of the job this counter will tell us how many updates have been performed during this round.

Now suppose we want to run this algorithm up to stabilization, that is up to the point where no more updates will occur. To do this, we can simply check the number of updates reported by the counter and stop when it reaches zero. This must be done in the driver program, in between successive invocations of the MapReduce job.

Another important thing for an iterative algorithm in Hadoop is defining a naming schema for the iterations. The simplest thing is to create a working directory based on the input or output name. Every iteration will use a directory inside the working directory for input and output.

    @Override
public int run(String[] args) throws Exception {
int iteration = 0, result = 0;
boolean verbose = false;
Path inputPath = new Path(args[0]);
Path basePath = new Path(args[1] + "_iterations");
FileSystem fs = FileSystem.get(getConf());
assert fs.exists(inputPath) : "Input path does not exist: " + inputPath;
// working directory
fs.delete(basePath, true);
fs.mkdirs(basePath);

// copy input into working dir
Path outputPath = new Path(basePath, iteration + "");
Job job0 = getPrototypeJob(iteration);
FileInputFormat.setInputPaths(job0, inputPath);
FileOutputFormat.setOutputPath(job0, outputPath);
job0.setMapperClass(InitMapper.class);
job0.setReducerClass(InitReducer.class);
if (! job0.waitForCompletion(verbose) )
return -1;

iteration++; // iteration counter
inputPath = outputPath; // new input is the old output
outputPath = new Path(basePath, iteration + "");
Job job = getPrototypeJob(iteration);
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
job.setMapperClass(ExampleMapper.class);
job.setReducerClass(ExampleReducer.class);
if (! job.waitForCompletion(verbose) );
return -1;
// remove temporary files as you go, this is optional
fs.delete(inputPath, true);
// retrieve the counters
long redEdges = job.getCounters().findCounter(RED).getValue();
long greenEdges = job.getCounters().findCounter(GREEN).getValue();
long blueEdges = job.getCounters().findCounter(BLUE).getValue();
System.out.println(String.format(
"red %d, green%d, blue %d", redEdges, greenEdges, blueEdges));
// compute termination condition
}
return 0;
}

private Job getPrototypeJob(int iteration) throws IOException {
Job job = new Job(getConf(), "Iteration " + iteration);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Vertex.class);
job.setMapOutputValueClass(Edge.class);
job.setOutputKeyClass(Vertex.class);
job.setOutputValueClass(Edge.class);
return job;
}

This snippet shows the driver program, usually realized by implementing the org.apache.hadoop.util.Tool interface and overriding the run() method. In the first part the driver interacts with the file system to create the working directory. Then we launch our first job which serves a double purpose, it copies the input in the working directory and performs any initial preprocessing needed. Finally, in the third part we start our iterations.
For each job, the input is the output of the last run. The output goes into the working directory with an increasing counter added at the end. After the job finishes we remove the temporary files (this is useful if we perform many iterations with many reducers and we have quotas on the namenode). We also access the counters to print a summary view of the graph and to compute the termination condition.

That’s it. Not too complicated after all 🙂