pip install mrjob
python setup.py test && python setup.py install
Writing your first job¶
Open a file called
mr_word_count.py and type this into it:
from mrjob.job import MRJob class MRWordFrequencyCount(MRJob): def mapper(self, _, line): yield "chars", len(line) yield "words", len(line.split()) yield "lines", 1 def reducer(self, key, values): yield key, sum(values) if __name__ == '__main__': MRWordFrequencyCount.run()
Now go back to the command line, find your favorite body of text (such mrjob’s
README.rst, or even your new file
mr_word_count.py), and try
$ python mr_word_count.py my_file.txt
You should see something like this:
"chars" 3654 "lines" 123 "words" 417
Congratulations! You’ve just written and run your first program with mrjob.
A “step” consists of a mapper, a combiner, and a reducer. All of those are optional, though you must have at least one. So you could have a step that’s just a mapper, or just a combiner and a reducer.
mapper() method takes a key and a value as args (in this case,
the key is ignored and a single line of text input is the value) and yields as
many key-value pairs as it likes. The
reduce() method takes a key
and an iterator of values and also yields as many key-value pairs as it likes.
(In this case, it sums the values for each key, which represent the numbers of
characters, words, and lines in the input.)
Forgetting the following information will result in confusion.
The final required component of a job file is these two lines at the end of the file, every time:
if __name__ == '__main__': MRWordCounter.run() # where MRWordCounter is your job class
These lines pass control over the command line arguments and execution to mrjob. Without them, your job will not work. For more information, see Hadoop Streaming and mrjob and Why can’t I put the job class and run code in the same file?.
Running your job different ways¶
The most basic way to run your job is on the command line:
$ python my_job.py input.txt
By default, output will be written to stdout.
You can pass input via stdin, but be aware that mrjob will just dump it to a file first:
$ python my_job.py < input.txt
You can pass multiple input files, mixed with stdin (using the
$ python my_job.py input1.txt input2.txt - < input3.txt
By default, mrjob will run your job in a single Python process. This provides the friendliest debugging experience, but it’s not exactly distributed computing!
You change the way the job is run with the
--runner option. You can
-r inline (the default),
-r hadoop, or
To run your job in multiple subprocesses with a few Hadoop features simulated,
To run it on your Hadoop cluster, use
If you have Dataproc configured (see Dataproc Quickstart), you can
run it there with
Your input files can come from HDFS if you’re using Hadoop, or GCS if you’re using Dataproc:
$ python my_job.py -r dataproc gcs://my-inputs/input.txt $ python my_job.py -r hadoop hdfs://my_home/input.txt
If you have Elastic MapReduce configured (see Elastic MapReduce Quickstart), you can
run it there with
Your input files can come from HDFS if you’re using Hadoop, or S3 if you’re using EMR:
$ python my_job.py -r emr s3://my-inputs/input.txt $ python my_job.py -r hadoop hdfs://my_home/input.txt
If your code spans multiple files, see Uploading your source tree.
Writing your second job¶
Here’s a job that finds the most commonly used word in the input:
from mrjob.job import MRJob from mrjob.step import MRStep import re WORD_RE = re.compile(r"[\w']+") class MRMostUsedWord(MRJob): def steps(self): return [ MRStep(mapper=self.mapper_get_words, combiner=self.combiner_count_words, reducer=self.reducer_count_words), MRStep(reducer=self.reducer_find_max_word) ] def mapper_get_words(self, _, line): # yield each word in the line for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner_count_words(self, word, counts): # optimization: sum the words we've seen so far yield (word, sum(counts)) def reducer_count_words(self, word, counts): # send all (num_occurrences, word) pairs to the same reducer. # num_occurrences is so we can easily use Python's max() function. yield None, (sum(counts), word) # discard the key; it is just None def reducer_find_max_word(self, _, word_count_pairs): # each item of word_count_pairs is (count, word), # so yielding one results in key=counts, value=word yield max(word_count_pairs) if __name__ == '__main__': MRMostUsedWord.run()
mrjob has an overflowing cornucopia of configuration options. You’ll want to specify some on the command line, some in a config file.
You can put a config file at
./mrjob.conf for mrjob to find it without passing it via
Config files are interpreted as YAML if you have the
installed. Otherwise, they are interpreted as JSON.
See Config file format and location for in-depth information. Here is an example file:
runners: emr: aws-region: us-west-2 inline: local_tmp_dir: $HOME/.tmp