Sunday, November 30, 2014

Fitting hadoop streaming into my python workflow

I got something working in Hadoop (by reading the documentation[1]) that is going to change the way I work with Wikipedia text data.  In order to give you a sense for how excited I am, I'm going to need to talk to you about the way that I approach my work with large datasets. 


My python workflow

So, I try to do as much work as I can with input/output streams.  I'm a bit of a unix geek, so that means I'm thinking about standard-in and standard-out and writing my analytics code as though I were supplementing UNIX core utils.  This has nice computational practicalities and it allows you to stream operations together.


Computational practicalities

The worst bottleneck you're likely to encounter in the course of doing data analysis on large files is memory.  Most statistical environments specifically support in-memory analysis of datasets to the exclusion of streaming.  This means that, in order to get any work done, you need to first copy all of your data from disk to memory.  If your dataset is larger than the available memory, you'll likely end up crashing the machine you are working with -- or at least render it unusable until your process finally crashes.



On the other hand, python, perl and unix utilities (like cut, shuf, sort, sed, etc.) afford some powerful capability for working with datasets as streams, and therefore, dramatically reducing the memory footprint of a dataset.  As the diagram above makes apparent, if you can fit your data operation into a streaming job, then it doesn't matter how big your dataset is and you'll end up needing very little memory. 


Streaming operations together

I like Doug McIlroy's summary of the unix philosophy:
Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.
By following these principles, you can build "operators" rather than stand-alone programs.  This allows you to write less code (which is a Good Thing™) and get more done.  Recently, I wrote a paper that required me to wrangle 12 different datasets from 7 different websites -- and being able to integrate my code with unix core utilities was a godsend.  Here's a line from my projects Makefile that I think captures the amazingness. 

(echo -e "user_id\tintertime\ttype";
bzcat datasets/originals/aol_search.tsv.bz2 |
tail -n+2 |
sort -k1,2 |
./intertimes |
sed -r "s/(.*)/\1\tapp view/" | things
shuf -n 100000) >
datasets/aol_search_intertime.sample.tsv



These lines achieve the following steps of a streaming job:
  1. Print out new headers: user_id, intertime, type
  2. Decompress the app_view.tsv.bz2 dataset
  3. Trim off the original header line
  4. Sort the dataset by the first two columns (user_id, timestamp)
  5. Compute the time between events per user (this is my python)
  6. Add a column of "app view" corresponding to the "type" header
  7. Randomly sample 100000 observations
  8. Write it all out to a new data file.  
In order to perform this complex operation over 28 million rows, I wrote ~30 lines of python code and put it in the middle of a few unix utilities that do most of the heavy lifting for me.  The processes took about 2 minutes and used a couple hundred MB of memory.  The random sample that we finally arrive at contains only 100k rows and will happily load into memory in about a second.  

Bringing it together with Hadoop streaming

If you've been living under a rock for the last 5 years or so, Hadoop is a framework for performing map/reduce operations that has become the industry standard.  Through Hadoop's streaming interface, I can make use of UNIXy streaming utilities to get my python work done faster.  Since I'm already thinking about my work as processing data one-row-at-a-time, map/reduce is not that much of a leap. '

However, there's one part of my streaming scripts that is very important to me, but I hadn't told you about it yet.  When I do behavioral analysis in Wikipedia, I often need to choose a dimension and process user actions in order over time.  A good example of actions in Wikipedia is an "edit".  There are two really interesting dimensions to process edits: page and user. 


The diagram above visualizes these two dimensions.  If we look at my edits over time, we are looking at the "user" dimension.  If we look at the article Anarchism's edits of time, we are looking at the "page" dimension.  In order to processed data based on these dimensions, I'll usually sort and partition the data.  Notabily, the designers of Wikipedia's XML dumps had this in mind when they put together their format.  They partition based on page and sort based on timestamp.  Yet, hadoop's native map/reduce strategy doesn't afford the ability to insist that I'll see a page's revisions or a user's edits together and in order.

Enter secondary sort.  Using  org.apache.hadoop.mapred.lib.KeyFieldBasedComparator and org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, I can tell hadoop to sort and partition data before the "reduce" phase of the process.  The best part is, these guys are parameterized in the exact same way as UNIX sort

So, let's say you want to process a users activities over time.  Set the partitioner divide the data based on the user_id column and tell hadoop to sort based on the revision timestamp.  Here's what the call looks like for a dataset I'm working with right now.  (first column == user_id, second column == timestamp)

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D  mapreduce.output.fileoutputformat.compress=false \
    -D  mapreduce.input.fileinputformat.split.minsize=300000000 \
    -D  mapreduce.task.timeout=6000000 \
    -D  stream.num.map.output.key.fields=2 \
    -D  mapreduce.partition.keypartitioner.options='-k1,1n' \
    -D  mapred.output.key.comparator.class="org.apache.hadoop.mapred.lib.KeyFieldBasedComparator" \
    -D  mapreduce.partition.keycomparator.options='-k1,1n -k2,2n' \
    -files       intertime.py \
    -archives    'hdfs:///user/halfak/hadoopin/virtualenv.zip#virtualenv' \
    -input       /user/halfak/enwiki_20141025.*.tsv \
    -output      "/user/halfak/hadoopin/enwiki_intertime/$(date +%Y%m%d%H%M%S)" \
    -mapper      cat \
    -reducer     intertime.py \

    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

So, I'm stoked.  This will change the way I do my work.

1. Which leaves a bit to be desired[2]
2. Finding which error code to google is worse

3 comments:

  1. This comment has been removed by a blog administrator.

    ReplyDelete
  2. This comment has been removed by a blog administrator.

    ReplyDelete
  3. This comment has been removed by a blog administrator.

    ReplyDelete