Sunday, November 30, 2014

Fitting hadoop streaming into my python workflow

I got something working in Hadoop (by reading the documentation[1]) that is going to change the way I work with Wikipedia text data.  In order to give you a sense for how excited I am, I'm going to need to talk to you about the way that I approach my work with large datasets. 


My python workflow

So, I try to do as much work as I can with input/output streams.  I'm a bit of a unix geek, so that means I'm thinking about standard-in and standard-out and writing my analytics code as though I were supplementing UNIX core utils.  This has nice computational practicalities and it allows you to stream operations together.


Computational practicalities

The worst bottleneck you're likely to encounter in the course of doing data analysis on large files is memory.  Most statistical environments specifically support in-memory analysis of datasets to the exclusion of streaming.  This means that, in order to get any work done, you need to first copy all of your data from disk to memory.  If your dataset is larger than the available memory, you'll likely end up crashing the machine you are working with -- or at least render it unusable until your process finally crashes.



On the other hand, python, perl and unix utilities (like cut, shuf, sort, sed, etc.) afford some powerful capability for working with datasets as streams, and therefore, dramatically reducing the memory footprint of a dataset.  As the diagram above makes apparent, if you can fit your data operation into a streaming job, then it doesn't matter how big your dataset is and you'll end up needing very little memory. 


Streaming operations together

I like Doug McIlroy's summary of the unix philosophy:
Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.
By following these principles, you can build "operators" rather than stand-alone programs.  This allows you to write less code (which is a Good Thing™) and get more done.  Recently, I wrote a paper that required me to wrangle 12 different datasets from 7 different websites -- and being able to integrate my code with unix core utilities was a godsend.  Here's a line from my projects Makefile that I think captures the amazingness. 

(echo -e "user_id\tintertime\ttype";
bzcat datasets/originals/aol_search.tsv.bz2 |
tail -n+2 |
sort -k1,2 |
./intertimes |
sed -r "s/(.*)/\1\tapp view/" | things
shuf -n 100000) >
datasets/aol_search_intertime.sample.tsv



These lines achieve the following steps of a streaming job:
  1. Print out new headers: user_id, intertime, type
  2. Decompress the app_view.tsv.bz2 dataset
  3. Trim off the original header line
  4. Sort the dataset by the first two columns (user_id, timestamp)
  5. Compute the time between events per user (this is my python)
  6. Add a column of "app view" corresponding to the "type" header
  7. Randomly sample 100000 observations
  8. Write it all out to a new data file.  
In order to perform this complex operation over 28 million rows, I wrote ~30 lines of python code and put it in the middle of a few unix utilities that do most of the heavy lifting for me.  The processes took about 2 minutes and used a couple hundred MB of memory.  The random sample that we finally arrive at contains only 100k rows and will happily load into memory in about a second.  

Bringing it together with Hadoop streaming

If you've been living under a rock for the last 5 years or so, Hadoop is a framework for performing map/reduce operations that has become the industry standard.  Through Hadoop's streaming interface, I can make use of UNIXy streaming utilities to get my python work done faster.  Since I'm already thinking about my work as processing data one-row-at-a-time, map/reduce is not that much of a leap. '

However, there's one part of my streaming scripts that is very important to me, but I hadn't told you about it yet.  When I do behavioral analysis in Wikipedia, I often need to choose a dimension and process user actions in order over time.  A good example of actions in Wikipedia is an "edit".  There are two really interesting dimensions to process edits: page and user. 


The diagram above visualizes these two dimensions.  If we look at my edits over time, we are looking at the "user" dimension.  If we look at the article Anarchism's edits of time, we are looking at the "page" dimension.  In order to processed data based on these dimensions, I'll usually sort and partition the data.  Notabily, the designers of Wikipedia's XML dumps had this in mind when they put together their format.  They partition based on page and sort based on timestamp.  Yet, hadoop's native map/reduce strategy doesn't afford the ability to insist that I'll see a page's revisions or a user's edits together and in order.

Enter secondary sort.  Using  org.apache.hadoop.mapred.lib.KeyFieldBasedComparator and org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, I can tell hadoop to sort and partition data before the "reduce" phase of the process.  The best part is, these guys are parameterized in the exact same way as UNIX sort

So, let's say you want to process a users activities over time.  Set the partitioner divide the data based on the user_id column and tell hadoop to sort based on the revision timestamp.  Here's what the call looks like for a dataset I'm working with right now.  (first column == user_id, second column == timestamp)

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D  mapreduce.output.fileoutputformat.compress=false \
    -D  mapreduce.input.fileinputformat.split.minsize=300000000 \
    -D  mapreduce.task.timeout=6000000 \
    -D  stream.num.map.output.key.fields=2 \
    -D  mapreduce.partition.keypartitioner.options='-k1,1n' \
    -D  mapred.output.key.comparator.class="org.apache.hadoop.mapred.lib.KeyFieldBasedComparator" \
    -D  mapreduce.partition.keycomparator.options='-k1,1n -k2,2n' \
    -files       intertime.py \
    -archives    'hdfs:///user/halfak/hadoopin/virtualenv.zip#virtualenv' \
    -input       /user/halfak/enwiki_20141025.*.tsv \
    -output      "/user/halfak/hadoopin/enwiki_intertime/$(date +%Y%m%d%H%M%S)" \
    -mapper      cat \
    -reducer     intertime.py \

    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

So, I'm stoked.  This will change the way I do my work.

1. Which leaves a bit to be desired[2]
2. Finding which error code to google is worse

Friday, October 17, 2014

Wikipedia's socio-technology as compared to biology

I recently gave a presentation at the Wikimedia Foundation's Research Showcase where I compared Wikipedia to a paramecium.  I'll be turning that presentation into a blog post later, but in the meantime, I wanted to share a slide from that presentation that I thought was fun.


From left to right: a small village with about 150 people, the MediaWiki software and extensions, the crowd from Wikimania'14 and Wikipedia.

Here, I equate organelles to specialized sub-systems of the greater paramecium.  In the case of Wikipedia, specialized sub-systems tend to take the form of software.

See also:


Sunday, September 28, 2014

Treading water & touching the lake bottom -- Perspective for young researchers

Last week, I went to the University of Michigan to visit Cliff Lampe and to speak at the ICOS lecture series.  In order to welcome me to Ann Arbor, Cliff set up a dinner with some of his bright-eyed young grad students.  At one point in the conversation, I off-handedly gave some advice on making it through grad school with one’s psyche intact.  It got me thinking about the sense of fear I experience when pursuing unknowns.

Getting started on a new project is a disorienting experience.  This disorientation can be terrifying and disheartening.  However, as a research project gets more mature, the notion of what questions are interesting, how they can be answered, and why those answers matter begin to form fixed points of reference.  In my experience, these conceptual footholds bring an intense calming and sense of a productive purpose to a project.  They can mean the difference between feeling like one is standing before a soul crushing abyss, and instead, feeling like one is standing on solid ground.   I've experienced something like this before grad school and I think it serves as an appropriate metaphor.

New projects are like being thrown into the middle of a lake at night.

For those of you who did not grow up in Minnesota (a.k.a. “The Land of 10,000 Lakes”) swimming in a lake is an interesting experience about which I’d like to share some insights.  I’ve sketched an attractive graphic.  


So, when you are floating in a lake and you can’t reach the bottom with your feet, you don’t have a good sense for how deep the water is.  In the dark water, your imagination of the distance between you and the lake's bottom beneath you can be your enemy.  Even if the bottom of the lake is just inches away, it could be a hundred feet away -- and filled with eels, sharks and monsters.  This could be seen as a childish way of thinking, but even today I find myself slipping into the spiral of imagined terrors when I swim in large, deep bodies of water.  

As you approach the shore, the fear doesn’t progressively subside.  Instead, the act of swimming feels a bit like fleeing and that enhances the thoughts of imagined terrors just inches from your feet.  Faster swimming leads to more terror enhancement in sneaky spiral that makes you want to swim for your life.

However, once your feet come into contact with the sandy bottom, the relief is immediate.  This knowledge of where the bottom is gives you perspective. You can now measure progress toward the shore.   You can stand and stop treading water.   There's no room for monsters beneath you anymore.

OK, back to research projects.  I see a lot of young grad students leading a project for the first time struggling in their deep water stage(which is totally normal) and assuming that it’s because they are somehow less competent than others(they aren't) or that their work won't lead to anything useful.  This happens all too often when students are making fine progress. They may be getting close to the shore of their research project, but they don't have a sense of any reference points yet. The unknowns still feel infinite and intractable.  The direction they're going could be leading them to deeper water.

So far as I can tell, everyone experiences the deep water stage of a research project, no matter how senior.  Seasoned researchers have made peace with, or even celebrate, this disorienting part of research in the same way that veterans of the lake have made peace with their imagined deep-water monsters. The trick is to staying sane is to recognize that sneaky spiral of fear when it happens.