Feeds:
Posts
Comments

Posts Tagged ‘mapreduce’

Say you have a stream of items of large and unknown length that we can only iterate over once. Create an algorithm that randomly chooses an item from this stream such that each item is equally likely to be selected.

Algorithms Every Data Scientist Should Know: Reservoir Sampling

Advertisements

Read Full Post »

It looks like people are now realizing the need for powerful real-time analytics engines. Dremel was designed by Google as an interactive query engine for cluster environments. Dremel is a scalable, interactive ad-hoc query system for analysis of read-only nested data. It’s the perfect tool to power your Big Data dashboards. Now a bunch of open source clones are appearing. Will they have the same luck as Hadoop?

Here the contenders:

  • The regular: Apache Drill
    Mainly supported by MapR Technologies and currently in incubator. At the moment of writing there are a total of 12 issues in Jira (for a comparison, we just got to 3000 in Pig this week). Even the name still needs to be confirmed. As usual in the Apache style, this will most likely be a community-driven project, with all the pros and cons of the case. The Java + Maven + Git combo should be familiar and enable contributors to get up to speed quickly.
  • The challenger: Cloudera Impala
    Just open-sourced a couple of days ago. Surprisingly, it even offers the possibility of joining tables (something Dremel didn’t do for efficiency reasons). Unfortunately it is all C++, which I have hanged on the nail after my master’s thesis. I hope this choice won’t scare away contributors. However I suspect that Cloudera wants to drive most of the development in-house rather than build a community project.
  • The outsider: Metamarkets Druid
    Even though it has been around for a year, it has only recently become open source. It’s interesting to read how these guys were frustrated by existing technology so they just decided to roll their own solution. My (unsupported) feeling is that this is by far the most mature of the three clones. One interesting feature of Druid is real-time ingestion of data. From what I gather, Impala relies on Hive tables and Drill on Avro files, so my guess is that both of them cannot do the same. (For the records, also here Java + Maven + Git).

As a technical side note, and a personal curiosity, I wonder whether these project would benefit from YARN integration. I guess it will be easier for Drill than for the others. However startup latency could be an issue in this case.

The whole situation seems like a déjà vu of the Giraph/Hama/GoldenOrb clones of Pregel. Who will win the favor of the Big Data crowd?
Who will be able to grow the largest community? Technical issues are only a part of the equation in this case.

I am quite excited to see this missing piece of the Big Data ecosystem getting more attention and thrilled by the competition.

PS: I have read somewhere around the Web that this will be the end of Hadoop and MapReduce. Nothing can be more wrong than this idea. Dremel is the perfect complement for MapReduce. Indeed, how better could you analyze the results of you MapReduce computation? Often the results are at least as big as the inputs, so you need a way to quickly generate small summaries. Hadoop counters have been (ab)used for this purpose, but more flexible and powerful post-processing capabilities are needed.

PPS: Just to be clear, there is nothing tremendously innovative in the science behind these products. Distributed query execution engines have been around for a while in parallel database systems. What’s yet to see is whether they can deliver their promise of extreme scalability, which parallel database system have failed to offer.

Read Full Post »

My job is reducing!

The Big Data scientist equivalent to…

Image

Read Full Post »

While speaking about my latest work with some researchers, they confessed me they were very curious about how to implement an iterative algorithm in MapReduce/Hadoop. They made me realize that the task is not exactly straightforward, so I decided to write a post about it, and here we are.

The general idea for iterative algorithms in MapReduce is to chain multiple jobs together, using the output of the last one as the input of the next one. An important consideration is that, given the usual size of the data, the termination condition must be computed within the MapReduce program. The standard MapReduce model does not offer simple elegant ways to do this, but Hadoop has some added features that simplify this task: Counters.

To check for a termination condition with Hadoop counters, you run the job and collect statistics while its running. Then you access the counters and get their values, you might also compute derivate measures from counters and finally decide whether to continue iterating or to stop.

Here an example. The code is only partial, it’s intended just to show the technique. Suppose we want to do some directed graph processing using a diffusion process (the exact application doesn’t matter, I am using a coloring example here). Assume the mapper propagates the information along the graph and the reducer computes the new one.

    public static enum Color {
        RED, GREEN, BLUE;
    }

    public static enum State {
        UPDATED;
    }

    public static class ExampleReducer extends Reducer<Vertex, Edge, Vertex, Edge> {

        @Override
        protected void reduce(Vertex key, Iterable<Edge> inLinks, Context context) {
            int numRed=0, numGreen=0, numBlue=0;
            for (Edge e : inLinks) {
                switch (e.color()) {
                case RED:
                    numRed++;
                    break;
                case GREEN:
                    numGreen++;
                    break;
                case BLUE:
                    numBlue++;
                    break;
                default:
                    assert false : "Unexpected color " + e;
                }
            }
            context.getCounter(RED).increment(numRed);
            context.getCounter(GREEN).increment(numGreen);
            context.getCounter(BLUE).increment(numBlue);

            Color currentColor = key.color();
            Color newColor = computeNewColor(numRed, numGreen, numBlue);
            if (currentColor != newColor) {
                context.getCounter(UPDATED).increment(1);
                key.setColor(newColor);
            }
            for (Edge e : key.outLinks()) {
                e.setColor(newColor);
                context.write(key, e);
            }
        }

In this snippet, we scan the incoming edges for each vertex and compute the number of edges for each color. Then we proceed to compute the new color based on some procedure (omitted for brevity) and write the updated results.
After the scan, we use the counters to report how many edges of each color we have seen. At the end of the job we will have a global view of the colors in the graph.
If we update the vertex with a new color, we report it using another counter (UPDATED). At the end of the job this counter will tell us how many updates have been performed during this round.

Now suppose we want to run this algorithm up to stabilization, that is up to the point where no more updates will occur. To do this, we can simply check the number of updates reported by the counter and stop when it reaches zero. This must be done in the driver program, in between successive invocations of the MapReduce job.

Another important thing for an iterative algorithm in Hadoop is defining a naming schema for the iterations. The simplest thing is to create a working directory based on the input or output name. Every iteration will use a directory inside the working directory for input and output.

    @Override
    public int run(String[] args) throws Exception {
        int iteration = 0, result = 0;
        boolean verbose = false;
        Path inputPath = new Path(args[0]);
        Path basePath = new Path(args[1] + "_iterations");
        FileSystem fs = FileSystem.get(getConf());
        assert fs.exists(inputPath) : "Input path does not exist: " + inputPath;
        // working directory
        fs.delete(basePath, true);
        fs.mkdirs(basePath);

        // copy input into working dir
        Path outputPath = new Path(basePath, iteration + "");
        Job job0 = getPrototypeJob(iteration);
        FileInputFormat.setInputPaths(job0, inputPath);
        FileOutputFormat.setOutputPath(job0, outputPath);
        job0.setMapperClass(InitMapper.class);
        job0.setReducerClass(InitReducer.class);
        if (! job0.waitForCompletion(verbose) )
            return -1;

        boolean hasUpdates = true;
        while (hasUpdates) {
            iteration++; // iteration counter
            inputPath = outputPath; // new input is the old output
            outputPath = new Path(basePath, iteration + "");
            Job job = getPrototypeJob(iteration);
            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
            job.setMapperClass(ExampleMapper.class);
            job.setReducerClass(ExampleReducer.class);
            if (! job.waitForCompletion(verbose) );
                return -1;
            // remove temporary files as you go, this is optional
            fs.delete(inputPath, true);
            // retrieve the counters
            long redEdges = job.getCounters().findCounter(RED).getValue();
            long greenEdges = job.getCounters().findCounter(GREEN).getValue();
            long blueEdges = job.getCounters().findCounter(BLUE).getValue();
            System.out.println(String.format(
                "red %d, green%d, blue %d", redEdges, greenEdges, blueEdges));
            long updates = job.getCounters().findCounter(UPDATED).getValue();
            // compute termination condition
            hasUpdates = (updates > 0);
        }
        return 0;
    }

    private Job getPrototypeJob(int iteration) throws IOException {
        Job job = new Job(getConf(), "Iteration " + iteration);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setMapOutputKeyClass(Vertex.class);
        job.setMapOutputValueClass(Edge.class);
        job.setOutputKeyClass(Vertex.class);
        job.setOutputValueClass(Edge.class);
        return job;
    }

This snippet shows the driver program, usually realized by implementing the org.apache.hadoop.util.Tool interface and overriding the run() method. In the first part the driver interacts with the file system to create the working directory. Then we launch our first job which serves a double purpose, it copies the input in the working directory and performs any initial preprocessing needed. Finally, in the third part we start our iterations.
For each job, the input is the output of the last run. The output goes into the working directory with an increasing counter added at the end. After the job finishes we remove the temporary files (this is useful if we perform many iterations with many reducers and we have quotas on the namenode). We also access the counters to print a summary view of the graph and to compute the termination condition.

That’s it. Not too complicated after all 🙂

Read Full Post »

My last work “Social Content Matching in MapReduce” got accepted in VLDB
(Very Large Data Bases).

YES!!!

(as you might tell, I am extremely happy about this 🙂 )

In the paper we tackle the problem of content distribution in a social media web site like flickr, model the problem as a b-matching problem on a graph and solve it with a smart iterative algorithm in MapReduce. We also show how to design a scalable greedy algorithm for the same problem in MapReduce.

Here the abstract:

Matching problems are ubiquitous. They occur in economic markets, labor markets, internet advertising, and elsewhere. In this paper we focus on an application of matching for social media. Our goal is to distribute content from information suppliers to information consumers.
We seek to maximize the overall relevance of the matched content from suppliers to consumers while regulating the overall activity, e.g., ensuring that no consumer is overwhelmed with data and that all suppliers have chances to deliver their content.

We propose two matching algorithms, GreedyMR and StackMR, geared for the MapReduce paradigm. Both algorithms have provable approximation guarantees, and in practice they produce high-quality solutions. While both algorithms scale extremely well, we can show that StackMR requires only a poly-logarithmic number of MapReduce steps, making it an attractive option for applications with very large datasets. We experimentally show the trade-offs between quality and efficiency of our solutions on two large datasets coming from real-world social-media web sites.

On a final note, thanks to my co-authors for their hard work and guidance:
Aris Gionis from Yahoo! Research and Mauro Sozio from Max Planck Institut.

Read Full Post »

SMAQ vs CDC

The name SMAQ (Storage MapReduce And Query) Cloud Stack was proposed by Edd Dumbill in this article on O’Reilly Radar (the article is sure worth reading).

While I find that a nice name is a good way to crystallize a concept, I am not sure it captures the whole picture about Big Data.
For example, compare the image on the left from the article, with the one on the right that comes directly from my Ph.D. research proposal.

The SMAQ stack for Big Data

View of the SMAQ stack from O'Reilly Radar

Cloud Computing Stack

View of the Data Intensive Cloud Computing stack from my Ph.D. research proposal

I will now dub my stack proposal CDC (Computation Data Coordination) stack (suggestions for better names super welcome!).

The query layer from SMAQ would map to my High Level Languages layer.
This layer includes systems like Pig, Hive and Cascading.

The MapReduce layer from SMAQ would map to my Computation layer.
The only other system in this layer is Dryad, for the moment, but there could be many others.

The Storage layer from SMAQ would map to my Distributed Data layer.
The SMAQ classification does not differentiate between systems like HDFS and HBase.
According to me, the high level interface is what distinguishes HDFS from HBase (or for example Voldemort from Cassandra).
That is why I would put HDFS and Voldemort in the Distributed Data layer, and HBase and Cassandra in the Data Abstraction layer (even though Cassandra does not actually rely on another system to store its data).

Finally, the SMAQ stack is totally lacking the Coordination layer.
This is comprehensible, as the audience of radar is more “analyst” oriented. From a operational perspective, systems like Chubby and Zookeeper are useful to build the frameworks in the stack.

What is missing from both stacks, and will be the main trend in 2011, is the Real-Time layer (even though I would have no idea where to put it 🙂 )

Read Full Post »

Here is a little trick I had to learn while developing Apache Pig.

Pig uses JUnit as test framework. JUnit tests are very useful for unit testing, but end-to-end testing is not as easy. Even more in the case of Pig, that uses Hadoop (a distributed MapReduce engine) to execute its scripts. The MiniCluster class addresses this issue: it simulates a full execution environment on the local machine, with HDFS and everything you need. More information here.

MiniCluster is very easy to use, assuming you are running your tests via ant. But if you want to debug and trace your test (using Eclipse, for instance) there are a couple of catches. Basically, you need to reproduce the environment the ant script builds inside Eclipse.

The first thing to set is the hadoop.log.dir property, that tells where to put logs. Its default value is build/test/logs. To set it, go in the Run Configurations screen, Arguments tab, and add this line to the VM arguments:

-Dhadoop.log.dir=build/test/logs

If you forget to set this, you will get a nice NullPonterException:

ERROR mapred.MiniMRCluster: Job tracker crashed
java.lang.NullPointerException
at java.io.File.<init>(File.java:222)
at org.apache.hadoop.mapred.JobHistory.init(JobHistory.java:151)
at org.apache.hadoop.mapred.JobTracker.<init>(JobTracker.java:1617)
at org.apache.hadoop.mapred.JobTracker.startTracker(JobTracker.java:183)
at org.apache.hadoop.mapred.MiniMRCluster$JobTrackerRunner.run(MiniMRCluster.java:106)
at java.lang.Thread.run(Thread.java:619)

The other thing to take care of is where to find MiniCluster‘s configuration file. For Pig, you should first create it by running the ant test target once from the command line. This will create a standard minimum configuration file for your use in ${HOME}/pigtest/conf. To set it, you should add this directory to the classpath in the Classpath tab, under User Entries using the Advanced… button.

If you forget to set this, you get a nice ExecException:

org.apache.pig.backend.executionengine.ExecException: ERROR 4010: Cannot find hadoop configurations in 
 classpath (neither hadoop-site.xml nor core-site.xml was found in the classpath).If you plan to use 
 local mode, please put -x local option in command line
 at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.init(HExecutionEngine.java:149)
 at org.apache.pig.backend.hadoop.executionengine.HExecutionEngine.init(HExecutionEngine.java:114)
 at org.apache.pig.impl.PigContext.connect(PigContext.java:183)
 at org.apache.pig.PigServer.<init>(PigServer.java:216)
 at org.apache.pig.PigServer.<init>(PigServer.java:205)
 at org.apache.pig.PigServer.<init>(PigServer.java:201)
 at org.apache.pig.test.TestSecondarySort.setUp(TestSecondarySort.java:73)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:27)
 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:73)
 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:46)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180)
 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:41)
 at org.junit.runners.ParentRunner$1.evaluate(ParentRunner.java:173)
 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:220)
 at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:49)
 at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
 at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
 at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
 at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
 at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)

Even after this, you will still get some exceptions (regarding threads, manifest files, jars), but they are not a problem and debugging will work.

Hope this helps!

Read Full Post »

Older Posts »