Feeds:
Posts
Comments

Posts Tagged ‘hadoop’

Say you have a stream of items of large and unknown length that we can only iterate over once. Create an algorithm that randomly chooses an item from this stream such that each item is equally likely to be selected.

Algorithms Every Data Scientist Should Know: Reservoir Sampling

Advertisements

Read Full Post »

It looks like people are now realizing the need for powerful real-time analytics engines. Dremel was designed by Google as an interactive query engine for cluster environments. Dremel is a scalable, interactive ad-hoc query system for analysis of read-only nested data. It’s the perfect tool to power your Big Data dashboards. Now a bunch of open source clones are appearing. Will they have the same luck as Hadoop?

Here the contenders:

  • The regular: Apache Drill
    Mainly supported by MapR Technologies and currently in incubator. At the moment of writing there are a total of 12 issues in Jira (for a comparison, we just got to 3000 in Pig this week). Even the name still needs to be confirmed. As usual in the Apache style, this will most likely be a community-driven project, with all the pros and cons of the case. The Java + Maven + Git combo should be familiar and enable contributors to get up to speed quickly.
  • The challenger: Cloudera Impala
    Just open-sourced a couple of days ago. Surprisingly, it even offers the possibility of joining tables (something Dremel didn’t do for efficiency reasons). Unfortunately it is all C++, which I have hanged on the nail after my master’s thesis. I hope this choice won’t scare away contributors. However I suspect that Cloudera wants to drive most of the development in-house rather than build a community project.
  • The outsider: Metamarkets Druid
    Even though it has been around for a year, it has only recently become open source. It’s interesting to read how these guys were frustrated by existing technology so they just decided to roll their own solution. My (unsupported) feeling is that this is by far the most mature of the three clones. One interesting feature of Druid is real-time ingestion of data. From what I gather, Impala relies on Hive tables and Drill on Avro files, so my guess is that both of them cannot do the same. (For the records, also here Java + Maven + Git).

As a technical side note, and a personal curiosity, I wonder whether these project would benefit from YARN integration. I guess it will be easier for Drill than for the others. However startup latency could be an issue in this case.

The whole situation seems like a déjà vu of the Giraph/Hama/GoldenOrb clones of Pregel. Who will win the favor of the Big Data crowd?
Who will be able to grow the largest community? Technical issues are only a part of the equation in this case.

I am quite excited to see this missing piece of the Big Data ecosystem getting more attention and thrilled by the competition.

PS: I have read somewhere around the Web that this will be the end of Hadoop and MapReduce. Nothing can be more wrong than this idea. Dremel is the perfect complement for MapReduce. Indeed, how better could you analyze the results of you MapReduce computation? Often the results are at least as big as the inputs, so you need a way to quickly generate small summaries. Hadoop counters have been (ab)used for this purpose, but more flexible and powerful post-processing capabilities are needed.

PPS: Just to be clear, there is nothing tremendously innovative in the science behind these products. Distributed query execution engines have been around for a while in parallel database systems. What’s yet to see is whether they can deliver their promise of extreme scalability, which parallel database system have failed to offer.

Read Full Post »

I have recently started to look at the BSP (Bulk Synchronous Parallel) model of computation for large scale graph analysis.
The model itself is quite old, but it has seen a resurgence lately mainly because of the Pregel system by Google. Here the paper.
The idea of Pregel is to use synchronous barriers to cleanly separate messages in rounds.
In each step, a vertex can read the messages sent to it in the previous step, perform some computation, and generate messages for the next step.
When all the nodes vote to halt the  algorithm and no more messages arrive, the computation stops.

As with MapReduce and Hadoop, a number of clones of Pregel have popped up.
I will briefly review the most interesting ones in this post.

 

Apache Giraph

Giraph is a Hadoop-based graph processing engine open sourced by Yahoo! and now in the Apache Incubator.
It is at a very early stage, but the community looks very active and diverse. Giraph developers play in the same backyard of Hadoop, Pig and Hive developers, and this makes me feel confident about its future.
Giraph uses Hadoop for scheduling its workers and get the data loaded in memory, but then implements its own communication protocol using HadoopRPC. It also uses Zookeeper for fault tolerance by periodic checkpointing.
Interestingly, Giraph does not require installing anything on the cluster, so you can try it out on your existing Hadoop infrastructure. A Giraph job runs like a normal map-only job. It can actually be thought as a graph-specific Hadoop library.
However, because of this fact, the API currently lack encapsulation. Users need to write a custom  Reader + Writer + InputFormat + OutputFormat for each Vertex program they create. A library to read common graph formats would be a nice addition.

The good:
Very active community with people from the Hadoop ecosystem.
Runs directly on Hadoop.

The bad:
Early stage.
Hadoop leaks in the API.

 

Apache Hama

Hama is a Hadoop-like solution and is the oldest member in the group. Currently it is in the Apache Incubator and it was initially developed by an independent Korean programmer.
Hama focuses on general BSP computations, so it is not only for graphs. For example there are algorithms for matrix inversion and linear algebra (I know, one could argue that a graph and a matrix are actually the same data structure).
Unfortunately, the project seems to be moving slowly even though lately there has been a spike of activity, probably caused mainly by the GSoC (it works!).
Currently it is still at a very early stage:  the current version doesn’t provide a proper I/O API and data partitioner. From my understanding there is no fault-tolerance either.
From the technical point of view, Hama uses Zookeeper for  coordination and HDFS for data persistence. It is designed to be “The Hadoop of BSP”.
Given its focus on general BSP, the kind of primitives that it provides are at a low level of abstraction, very much like a restricted version of MPI.

The good:
General BSP.
Complete system.
The logo is cute.

The bad:
Early stage.
Requires additional infrastructure.
Graph processing library not yet released.

 

GoldenOrb

GoldenOrb is a Hadoop based Pregel clone open sourced by Ravel. It should be in the process of getting into the Apache Incubator.
It is a close clone of Pregel and very similar to Giraph: a system to run distributed iterative algorithms on graphs.
The components of the system like vertex values (state), edges and messages are built on top of the Hadoop Writables system.
One thing I don’t understand is why they didn’t leverage Java generics. To get the value of a message you need to do ugly explicit casts:

@Override public void compute(Collection<IntMessage> messages) {
  int _maxValue = 0;
  for(IntMessage m: messages) {
    int msgValue = ((IntWritable)m.getMessageValue()).get();
    _maxValue = Math.max(_maxValue, msgValue);
  }
}

As with Giraph, Hadoop details leak into the API. However, GoldenOrb requires additional infrastructure to run. An OrbTracker needs to be running on each Hadoop node. It Also uses HadoopRPC for communication. The administrator can define the number of partitions to assign to each OrbTracker and the threads per node to launch can be configured on a per-job basis.

The good:
Commercial support.

The bad:
Early stage, not yet in Incubator.
API has rough edges.
Requires additional infrastructure.

 

JPregel

A final mention goes to JPregel. It is a Java clone of Pregel which is not Hadoop based.
The programming interface is thought from the ground up and is very clean.
Right now it is at a very very early stage of development, e.g. messages can only be doubles and the system itself is not yet finalized.
Interestingly, no explicit halting primitive is present. From what I understood it automatically deduces the termination by the absence of new messages.
It is a very nice piece of work, especially considering the fact that it has been done by only a couple of first year master students.

 

PS: I waited too much before publishing this post and I was beaten on time by one of the Giraph committers, even though he reviews also some other piece of software. Have a look at his post.

Read Full Post »

While speaking about my latest work with some researchers, they confessed me they were very curious about how to implement an iterative algorithm in MapReduce/Hadoop. They made me realize that the task is not exactly straightforward, so I decided to write a post about it, and here we are.

The general idea for iterative algorithms in MapReduce is to chain multiple jobs together, using the output of the last one as the input of the next one. An important consideration is that, given the usual size of the data, the termination condition must be computed within the MapReduce program. The standard MapReduce model does not offer simple elegant ways to do this, but Hadoop has some added features that simplify this task: Counters.

To check for a termination condition with Hadoop counters, you run the job and collect statistics while its running. Then you access the counters and get their values, you might also compute derivate measures from counters and finally decide whether to continue iterating or to stop.

Here an example. The code is only partial, it’s intended just to show the technique. Suppose we want to do some directed graph processing using a diffusion process (the exact application doesn’t matter, I am using a coloring example here). Assume the mapper propagates the information along the graph and the reducer computes the new one.

    public static enum Color {
        RED, GREEN, BLUE;
    }

    public static enum State {
        UPDATED;
    }

    public static class ExampleReducer extends Reducer<Vertex, Edge, Vertex, Edge> {

        @Override
        protected void reduce(Vertex key, Iterable<Edge> inLinks, Context context) {
            int numRed=0, numGreen=0, numBlue=0;
            for (Edge e : inLinks) {
                switch (e.color()) {
                case RED:
                    numRed++;
                    break;
                case GREEN:
                    numGreen++;
                    break;
                case BLUE:
                    numBlue++;
                    break;
                default:
                    assert false : "Unexpected color " + e;
                }
            }
            context.getCounter(RED).increment(numRed);
            context.getCounter(GREEN).increment(numGreen);
            context.getCounter(BLUE).increment(numBlue);

            Color currentColor = key.color();
            Color newColor = computeNewColor(numRed, numGreen, numBlue);
            if (currentColor != newColor) {
                context.getCounter(UPDATED).increment(1);
                key.setColor(newColor);
            }
            for (Edge e : key.outLinks()) {
                e.setColor(newColor);
                context.write(key, e);
            }
        }

In this snippet, we scan the incoming edges for each vertex and compute the number of edges for each color. Then we proceed to compute the new color based on some procedure (omitted for brevity) and write the updated results.
After the scan, we use the counters to report how many edges of each color we have seen. At the end of the job we will have a global view of the colors in the graph.
If we update the vertex with a new color, we report it using another counter (UPDATED). At the end of the job this counter will tell us how many updates have been performed during this round.

Now suppose we want to run this algorithm up to stabilization, that is up to the point where no more updates will occur. To do this, we can simply check the number of updates reported by the counter and stop when it reaches zero. This must be done in the driver program, in between successive invocations of the MapReduce job.

Another important thing for an iterative algorithm in Hadoop is defining a naming schema for the iterations. The simplest thing is to create a working directory based on the input or output name. Every iteration will use a directory inside the working directory for input and output.

    @Override
    public int run(String[] args) throws Exception {
        int iteration = 0, result = 0;
        boolean verbose = false;
        Path inputPath = new Path(args[0]);
        Path basePath = new Path(args[1] + "_iterations");
        FileSystem fs = FileSystem.get(getConf());
        assert fs.exists(inputPath) : "Input path does not exist: " + inputPath;
        // working directory
        fs.delete(basePath, true);
        fs.mkdirs(basePath);

        // copy input into working dir
        Path outputPath = new Path(basePath, iteration + "");
        Job job0 = getPrototypeJob(iteration);
        FileInputFormat.setInputPaths(job0, inputPath);
        FileOutputFormat.setOutputPath(job0, outputPath);
        job0.setMapperClass(InitMapper.class);
        job0.setReducerClass(InitReducer.class);
        if (! job0.waitForCompletion(verbose) )
            return -1;

        boolean hasUpdates = true;
        while (hasUpdates) {
            iteration++; // iteration counter
            inputPath = outputPath; // new input is the old output
            outputPath = new Path(basePath, iteration + "");
            Job job = getPrototypeJob(iteration);
            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
            job.setMapperClass(ExampleMapper.class);
            job.setReducerClass(ExampleReducer.class);
            if (! job.waitForCompletion(verbose) );
                return -1;
            // remove temporary files as you go, this is optional
            fs.delete(inputPath, true);
            // retrieve the counters
            long redEdges = job.getCounters().findCounter(RED).getValue();
            long greenEdges = job.getCounters().findCounter(GREEN).getValue();
            long blueEdges = job.getCounters().findCounter(BLUE).getValue();
            System.out.println(String.format(
                "red %d, green%d, blue %d", redEdges, greenEdges, blueEdges));
            long updates = job.getCounters().findCounter(UPDATED).getValue();
            // compute termination condition
            hasUpdates = (updates > 0);
        }
        return 0;
    }

    private Job getPrototypeJob(int iteration) throws IOException {
        Job job = new Job(getConf(), "Iteration " + iteration);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setMapOutputKeyClass(Vertex.class);
        job.setMapOutputValueClass(Edge.class);
        job.setOutputKeyClass(Vertex.class);
        job.setOutputValueClass(Edge.class);
        return job;
    }

This snippet shows the driver program, usually realized by implementing the org.apache.hadoop.util.Tool interface and overriding the run() method. In the first part the driver interacts with the file system to create the working directory. Then we launch our first job which serves a double purpose, it copies the input in the working directory and performs any initial preprocessing needed. Finally, in the third part we start our iterations.
For each job, the input is the output of the last run. The output goes into the working directory with an increasing counter added at the end. After the job finishes we remove the temporary files (this is useful if we perform many iterations with many reducers and we have quotas on the namenode). We also access the counters to print a summary view of the graph and to compute the termination condition.

That’s it. Not too complicated after all 🙂

Read Full Post »