My python workflow
So, I try to do as much work as I can with input/output streams. I'm a bit of a unix geek, so that means I'm thinking about standard-in and standard-out and writing my analytics code as though I were supplementing UNIX core utils. This has nice computational practicalities and it allows you to stream operations together.
Computational practicalities
The worst bottleneck you're likely to encounter in the course of doing data analysis on large files is memory. Most statistical environments specifically support in-memory analysis of datasets to the exclusion of streaming. This means that, in order to get any work done, you need to first copy all of your data from disk to memory. If your dataset is larger than the available memory, you'll likely end up crashing the machine you are working with -- or at least render it unusable until your process finally crashes.
On the other hand, python, perl and unix utilities (like cut, shuf, sort, sed, etc.) afford some powerful capability for working with datasets as streams, and therefore, dramatically reducing the memory footprint of a dataset. As the diagram above makes apparent, if you can fit your data operation into a streaming job, then it doesn't matter how big your dataset is and you'll end up needing very little memory.
Streaming operations together
I like Doug McIlroy's summary of the unix philosophy:
Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.
By following these principles, you can build "operators" rather than stand-alone programs. This allows you to write less code (which is a Good Thing™) and get more done. Recently, I wrote a paper that required me to wrangle 12 different datasets from 7 different websites -- and being able to integrate my code with unix core utilities was a godsend. Here's a line from my projects Makefile that I think captures the amazingness.
(echo -e "user_id\tintertime\ttype";
bzcat datasets/originals/aol_search.tsv.bz2 |
tail -n+2 |
bzcat datasets/originals/aol_search.tsv.bz2 |
tail -n+2 |
sort -k1,2 |
./intertimes |
./intertimes |
sed -r "s/(.*)/\1\tapp view/" | things
shuf -n 100000) >
datasets/aol_search_intertime.sample.tsv
Enter secondary sort. Using org.apache.hadoop.mapred.lib.KeyFieldBasedComparator and org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, I can tell hadoop to sort and partition data before the "reduce" phase of the process. The best part is, these guys are parameterized in the exact same way as UNIX sort!
So, let's say you want to process a users activities over time. Set the partitioner divide the data based on the user_id column and tell hadoop to sort based on the revision timestamp. Here's what the call looks like for a dataset I'm working with right now. (first column == user_id, second column == timestamp)
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.output.fileoutputformat.compress=false \
-D mapreduce.input.fileinputformat.split.minsize=300000000 \
-D mapreduce.task.timeout=6000000 \
-D stream.num.map.output.key.fields=2 \
-D mapreduce.partition.keypartitioner.options='-k1,1n' \
-D mapred.output.key.comparator.class="org.apache.hadoop.mapred.lib.KeyFieldBasedComparator" \
-D mapreduce.partition.keycomparator.options='-k1,1n -k2,2n' \
-files intertime.py \
-archives 'hdfs:///user/halfak/hadoopin/virtualenv.zip#virtualenv' \
-input /user/halfak/enwiki_20141025.*.tsv \
-output "/user/halfak/hadoopin/enwiki_intertime/$(date +%Y%m%d%H%M%S)" \
-mapper cat \
-reducer intertime.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
So, I'm stoked. This will change the way I do my work.
1. Which leaves a bit to be desired[2]
shuf -n 100000) >
datasets/aol_search_intertime.sample.tsv
These lines achieve the following steps of a streaming job:
- Print out new headers: user_id, intertime, type
- Decompress the app_view.tsv.bz2 dataset
- Trim off the original header line
- Sort the dataset by the first two columns (user_id, timestamp)
- Compute the time between events per user (this is my python)
- Add a column of "app view" corresponding to the "type" header
- Randomly sample 100000 observations
- Write it all out to a new data file.
In order to perform this complex operation over 28 million rows, I wrote ~30 lines of python code and put it in the middle of a few unix utilities that do most of the heavy lifting for me. The processes took about 2 minutes and used a couple hundred MB of memory. The random sample that we finally arrive at contains only 100k rows and will happily load into memory in about a second.
Bringing it together with Hadoop streaming
If you've been living under a rock for the last 5 years or so, Hadoop is a framework for performing map/reduce operations that has become the industry standard. Through Hadoop's streaming interface, I can make use of UNIXy streaming utilities to get my python work done faster. Since I'm already thinking about my work as processing data one-row-at-a-time, map/reduce is not that much of a leap. '
However, there's one part of my streaming scripts that is very important to me, but I hadn't told you about it yet. When I do behavioral analysis in Wikipedia, I often need to choose a dimension and process user actions in order over time. A good example of actions in Wikipedia is an "edit". There are two really interesting dimensions to process edits: page and user.
The diagram above visualizes these two dimensions. If we look at my edits over time, we are looking at the "user" dimension. If we look at the article Anarchism's edits of time, we are looking at the "page" dimension. In order to processed data based on these dimensions, I'll usually sort and partition the data. Notabily, the designers of Wikipedia's XML dumps had this in mind when they put together their format. They partition based on page and sort based on timestamp. Yet, hadoop's native map/reduce strategy doesn't afford the ability to insist that I'll see a page's revisions or a user's edits together and in order.
So, let's say you want to process a users activities over time. Set the partitioner divide the data based on the user_id column and tell hadoop to sort based on the revision timestamp. Here's what the call looks like for a dataset I'm working with right now. (first column == user_id, second column == timestamp)
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapreduce.output.fileoutputformat.compress=false \
-D mapreduce.input.fileinputformat.split.minsize=300000000 \
-D mapreduce.task.timeout=6000000 \
-D stream.num.map.output.key.fields=2 \
-D mapreduce.partition.keypartitioner.options='-k1,1n' \
-D mapred.output.key.comparator.class="org.apache.hadoop.mapred.lib.KeyFieldBasedComparator" \
-D mapreduce.partition.keycomparator.options='-k1,1n -k2,2n' \
-files intertime.py \
-archives 'hdfs:///user/halfak/hadoopin/virtualenv.zip#virtualenv' \
-input /user/halfak/enwiki_20141025.*.tsv \
-output "/user/halfak/hadoopin/enwiki_intertime/$(date +%Y%m%d%H%M%S)" \
-mapper cat \
-reducer intertime.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
So, I'm stoked. This will change the way I do my work.
1. Which leaves a bit to be desired[2]
2. Finding which error code to google is worse
This comment has been removed by a blog administrator.
ReplyDeleteThis comment has been removed by a blog administrator.
ReplyDeleteThis comment has been removed by a blog administrator.
ReplyDelete