Feeds:
Posts
Comments

Archive for the ‘PhD’ Category

http://blog.s4.io/2011/08/s4-0-3-0-released/

Read Full Post »

Grokking data

  • When you have been exploring a dataset for a while, studying its distribution, its composition, its quirks, its innards and its “essence” in the end.
  • When you know the answer to a query even before performing it.
  • When you have a strict hierarchical organization of the folders for plots.
  • When you know by heart the number of unique URLs and usernames in the dataset.
  • When your methodology to name files according to their schema has become more complex that the schemas themselves.
  • When you have five different versions of the dataset, but you forgot the reason behind four of them.
  • When the size of the scripts to analyze the dataset begins to rival the dataset itself.
  • Ultimately, when the only thought of putting again your hands on that data gives you urticaria.
That’s when you grokked the data.
Now imagine doing that on hundreds of gigabytes…

Read Full Post »

While speaking about my latest work with some researchers, they confessed me they were very curious about how to implement an iterative algorithm in MapReduce/Hadoop. They made me realize that the task is not exactly straightforward, so I decided to write a post about it, and here we are.

The general idea for iterative algorithms in MapReduce is to chain multiple jobs together, using the output of the last one as the input of the next one. An important consideration is that, given the usual size of the data, the termination condition must be computed within the MapReduce program. The standard MapReduce model does not offer simple elegant ways to do this, but Hadoop has some added features that simplify this task: Counters.

To check for a termination condition with Hadoop counters, you run the job and collect statistics while its running. Then you access the counters and get their values, you might also compute derivate measures from counters and finally decide whether to continue iterating or to stop.

Here an example. The code is only partial, it’s intended just to show the technique. Suppose we want to do some directed graph processing using a diffusion process (the exact application doesn’t matter, I am using a coloring example here). Assume the mapper propagates the information along the graph and the reducer computes the new one.

    public static enum Color {
        RED, GREEN, BLUE;
    }

    public static enum State {
        UPDATED;
    }

    public static class ExampleReducer extends Reducer<Vertex, Edge, Vertex, Edge> {

        @Override
        protected void reduce(Vertex key, Iterable<Edge> inLinks, Context context) {
            int numRed=0, numGreen=0, numBlue=0;
            for (Edge e : inLinks) {
                switch (e.color()) {
                case RED:
                    numRed++;
                    break;
                case GREEN:
                    numGreen++;
                    break;
                case BLUE:
                    numBlue++;
                    break;
                default:
                    assert false : "Unexpected color " + e;
                }
            }
            context.getCounter(RED).increment(numRed);
            context.getCounter(GREEN).increment(numGreen);
            context.getCounter(BLUE).increment(numBlue);

            Color currentColor = key.color();
            Color newColor = computeNewColor(numRed, numGreen, numBlue);
            if (currentColor != newColor) {
                context.getCounter(UPDATED).increment(1);
                key.setColor(newColor);
            }
            for (Edge e : key.outLinks()) {
                e.setColor(newColor);
                context.write(key, e);
            }
        }

In this snippet, we scan the incoming edges for each vertex and compute the number of edges for each color. Then we proceed to compute the new color based on some procedure (omitted for brevity) and write the updated results.
After the scan, we use the counters to report how many edges of each color we have seen. At the end of the job we will have a global view of the colors in the graph.
If we update the vertex with a new color, we report it using another counter (UPDATED). At the end of the job this counter will tell us how many updates have been performed during this round.

Now suppose we want to run this algorithm up to stabilization, that is up to the point where no more updates will occur. To do this, we can simply check the number of updates reported by the counter and stop when it reaches zero. This must be done in the driver program, in between successive invocations of the MapReduce job.

Another important thing for an iterative algorithm in Hadoop is defining a naming schema for the iterations. The simplest thing is to create a working directory based on the input or output name. Every iteration will use a directory inside the working directory for input and output.

    @Override
    public int run(String[] args) throws Exception {
        int iteration = 0, result = 0;
        boolean verbose = false;
        Path inputPath = new Path(args[0]);
        Path basePath = new Path(args[1] + "_iterations");
        FileSystem fs = FileSystem.get(getConf());
        assert fs.exists(inputPath) : "Input path does not exist: " + inputPath;
        // working directory
        fs.delete(basePath, true);
        fs.mkdirs(basePath);

        // copy input into working dir
        Path outputPath = new Path(basePath, iteration + "");
        Job job0 = getPrototypeJob(iteration);
        FileInputFormat.setInputPaths(job0, inputPath);
        FileOutputFormat.setOutputPath(job0, outputPath);
        job0.setMapperClass(InitMapper.class);
        job0.setReducerClass(InitReducer.class);
        if (! job0.waitForCompletion(verbose) )
            return -1;

        boolean hasUpdates = true;
        while (hasUpdates) {
            iteration++; // iteration counter
            inputPath = outputPath; // new input is the old output
            outputPath = new Path(basePath, iteration + "");
            Job job = getPrototypeJob(iteration);
            FileInputFormat.setInputPaths(job, inputPath);
            FileOutputFormat.setOutputPath(job, outputPath);
            job.setMapperClass(ExampleMapper.class);
            job.setReducerClass(ExampleReducer.class);
            if (! job.waitForCompletion(verbose) );
                return -1;
            // remove temporary files as you go, this is optional
            fs.delete(inputPath, true);
            // retrieve the counters
            long redEdges = job.getCounters().findCounter(RED).getValue();
            long greenEdges = job.getCounters().findCounter(GREEN).getValue();
            long blueEdges = job.getCounters().findCounter(BLUE).getValue();
            System.out.println(String.format(
                "red %d, green%d, blue %d", redEdges, greenEdges, blueEdges));
            long updates = job.getCounters().findCounter(UPDATED).getValue();
            // compute termination condition
            hasUpdates = (updates > 0);
        }
        return 0;
    }

    private Job getPrototypeJob(int iteration) throws IOException {
        Job job = new Job(getConf(), "Iteration " + iteration);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        job.setMapOutputKeyClass(Vertex.class);
        job.setMapOutputValueClass(Edge.class);
        job.setOutputKeyClass(Vertex.class);
        job.setOutputValueClass(Edge.class);
        return job;
    }

This snippet shows the driver program, usually realized by implementing the org.apache.hadoop.util.Tool interface and overriding the run() method. In the first part the driver interacts with the file system to create the working directory. Then we launch our first job which serves a double purpose, it copies the input in the working directory and performs any initial preprocessing needed. Finally, in the third part we start our iterations.
For each job, the input is the output of the last run. The output goes into the working directory with an increasing counter added at the end. After the job finishes we remove the temporary files (this is useful if we perform many iterations with many reducers and we have quotas on the namenode). We also access the counters to print a summary view of the graph and to compute the termination condition.

That’s it. Not too complicated after all 🙂

Read Full Post »

A much more objective article on NoSQL storage systems by Stonebraker.

10 Rules for Scalable Performance in ‘Simple Operation’ Datastores

Most of the rules are actually long held beliefs in the NoSQL community.
You can find most of the advices he gives in my Ph.D. proposal as well.

Read Full Post »

Ambiguous dedication

To my advisor, for whom no thanks is too much.

Anonymous frustrated Ph.D. student

Read Full Post »

Tough environment

At Yahoo! Research, you need a Ph.D. to get a cubicle.
With a Master’s you get at most a small desk!

Read Full Post »

Wikipedia Miner

I have been playing with Wikipedia Miner for my new research project. Wikipedia Miner is a toolkit that does many interesting things with Wikipedia. The one I am using is “wikification”, that is “The process of adding wiki links to specific named entities and other appropriate phrases in an arbitrary text.” It is a very useful procedure to enrich a text. “The process consists of automatic keyword extraction, word sense disambiguation, and automatically adding links to documents to Wikipedia”. In my case, I am more interested in topic detection so I care only about the first two (emphasized) phases.

Even though the software is a great piece of work, and the online demo works flawlessly, setting it up locally is a nightmare. The main reason is the very limited documentation. The main problem is that the Requirements section is missing all the version numbers for the required software.

To prevent others from suffering my same trial, I write here what I discovered about the set up of Wikipedia Miner.

  1. MySQL. You can use any version, but beware if you use version 4. Varchars over 255 in length get automatically converted to the smallest text fields that can contain it. Because text fields can not be fully indexed, you need to specify how much of it you want to index. Otherwise you will get this nice exception “java.sql.SQLException: Syntax error or access violation message from server: BLOB/TEXT column used in key specification without a key length”. Therefore, to make it work, add the bold/underlined parts (that specify the index length) to WikipediaDatabase.java:150 and recompile.


    createStatements.put("anchor", "CREATE TABLE anchor ("
    + "an_text varchar(300) binary NOT NULL, "
    + "an_to int(8) unsigned NOT NULL, "
    + "an_count int(8) unsigned NOT NULL, "
    + "PRIMARY KEY (an_text(300), an_to), "
    + "KEY (an_to)) ENGINE=MyISAM DEFAULT CHARSET=utf8;") ;


    createStatements.put("anchor_occurance", "CREATE TABLE anchor_occurance ("
    + "ao_text varchar(300) binary NOT NULL, "
    + "ao_linkCount int(8) unsigned NOT NULL, "
    + "ao_occCount int(8) unsigned NOT NULL, "
    + "PRIMARY KEY (ao_text(300))) ENGINE=MyISAM DEFAULT CHARSET=utf8;");

  2. Connector/J. Use version 3.0.17 or set the property jdbcCompliantTruncation=false. If you don’t you will get a nice “com.mysql.jdbc.MysqlDataTruncation: Data truncation” exception.
  3. Weka. Use version 3.6.4, otherwise you will get deserialization exceptions when loading the models (in my case “java.io.InvalidClassException: weka.classifiers.Classifier; local class incompatible: stream classdesc serialVersionUID = 6502780192411755341, local class serialVersionUID = 66924060442623804”).

So far I din’t have any problems with trove and servlet-api.

This confirms a well know fact: that one of the biggest problems of open source is lack of documentation. I should learn from this experience as well 🙂

I hope this spares some hours of work to somebody.

And kudos to David Milne for creating and releasing Wikipedia Miner as open source.
This is the proper way to do science!

Read Full Post »

Data Intensive Scalable Computing
DISC=Data Intensive Scalable Computing

ML=Machine Learning
DM=Data Mining
IR=Information Retrieval
DS=Distributed Systems
DB=Data Bases

Read Full Post »

My last work “Social Content Matching in MapReduce” got accepted in VLDB
(Very Large Data Bases).

YES!!!

(as you might tell, I am extremely happy about this 🙂 )

In the paper we tackle the problem of content distribution in a social media web site like flickr, model the problem as a b-matching problem on a graph and solve it with a smart iterative algorithm in MapReduce. We also show how to design a scalable greedy algorithm for the same problem in MapReduce.

Here the abstract:

Matching problems are ubiquitous. They occur in economic markets, labor markets, internet advertising, and elsewhere. In this paper we focus on an application of matching for social media. Our goal is to distribute content from information suppliers to information consumers.
We seek to maximize the overall relevance of the matched content from suppliers to consumers while regulating the overall activity, e.g., ensuring that no consumer is overwhelmed with data and that all suppliers have chances to deliver their content.

We propose two matching algorithms, GreedyMR and StackMR, geared for the MapReduce paradigm. Both algorithms have provable approximation guarantees, and in practice they produce high-quality solutions. While both algorithms scale extremely well, we can show that StackMR requires only a poly-logarithmic number of MapReduce steps, making it an attractive option for applications with very large datasets. We experimentally show the trade-offs between quality and efficiency of our solutions on two large datasets coming from real-world social-media web sites.

On a final note, thanks to my co-authors for their hard work and guidance:
Aris Gionis from Yahoo! Research and Mauro Sozio from Max Planck Institut.

Read Full Post »

SMAQ vs CDC

The name SMAQ (Storage MapReduce And Query) Cloud Stack was proposed by Edd Dumbill in this article on O’Reilly Radar (the article is sure worth reading).

While I find that a nice name is a good way to crystallize a concept, I am not sure it captures the whole picture about Big Data.
For example, compare the image on the left from the article, with the one on the right that comes directly from my Ph.D. research proposal.

The SMAQ stack for Big Data

View of the SMAQ stack from O'Reilly Radar

Cloud Computing Stack

View of the Data Intensive Cloud Computing stack from my Ph.D. research proposal

I will now dub my stack proposal CDC (Computation Data Coordination) stack (suggestions for better names super welcome!).

The query layer from SMAQ would map to my High Level Languages layer.
This layer includes systems like Pig, Hive and Cascading.

The MapReduce layer from SMAQ would map to my Computation layer.
The only other system in this layer is Dryad, for the moment, but there could be many others.

The Storage layer from SMAQ would map to my Distributed Data layer.
The SMAQ classification does not differentiate between systems like HDFS and HBase.
According to me, the high level interface is what distinguishes HDFS from HBase (or for example Voldemort from Cassandra).
That is why I would put HDFS and Voldemort in the Distributed Data layer, and HBase and Cassandra in the Data Abstraction layer (even though Cassandra does not actually rely on another system to store its data).

Finally, the SMAQ stack is totally lacking the Coordination layer.
This is comprehensible, as the audience of radar is more “analyst” oriented. From a operational perspective, systems like Chubby and Zookeeper are useful to build the frameworks in the stack.

What is missing from both stacks, and will be the main trend in 2011, is the Real-Time layer (even though I would have no idea where to put it 🙂 )

Read Full Post »

« Newer Posts - Older Posts »