Writing jobs¶
This guide covers everything you need to know to write your job. You’ll probably need to flip between this guide and Runners to find all the information you need.
Defining steps¶
Your job will be defined in a file to be executed on your machine as a Python script, as well as on a Hadoop cluster as an individual map, combine, or reduce task. (See How your program is run for more on that.)
All dependencies must either be contained within the file, available on the task nodes, or uploaded to the cluster by mrjob when your job is submitted. (Runners explains how to do those things.)
The following two sections are more reference-oriented versions of Writing your first job and Writing your second job.
Single-step jobs¶
The simplest way to write a one-step job is to subclass
MRJob
and override a few methods:
from mrjob.job import MRJob
import re
WORD_RE = re.compile(r"[\w']+")
class MRWordFreqCount(MRJob):
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield word.lower(), 1
def combiner(self, word, counts):
yield word, sum(counts)
def reducer(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__':
MRWordFreqCount.run()
(See Writing your first job for an explanation of this example.)
Here are all the methods you can override to write a one-step job. We’ll explain them over the course of this document.
Multi-step jobs¶
To define multiple steps, override steps()
to return a list of MRStep
s:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
WORD_RE = re.compile(r"[\w']+")
class MRMostUsedWord(MRJob):
def mapper_get_words(self, _, line):
# yield each word in the line
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def combiner_count_words(self, word, counts):
# sum the words we've seen so far
yield (word, sum(counts))
def reducer_count_words(self, word, counts):
# send all (num_occurrences, word) pairs to the same reducer.
# num_occurrences is so we can easily use Python's max() function.
yield None, (sum(counts), word)
# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
# each item of word_count_pairs is (count, word),
# so yielding one results in key=counts, value=word
yield max(word_count_pairs)
def steps(self):
return [
MRStep(mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(reducer=self.reducer_find_max_word)
]
if __name__ == '__main__':
MRMostUsedWord.run()
(This example is explained further in Protocols.)
The keyword arguments accepted by MRStep
are the same
as the
method names listed in the previous section,
plus a jobconf
argument which takes a
dictionary of jobconf arguments to pass to Hadoop.
Note
If this is your first time learning about mrjob, you should skip down to Protocols and finish this section later.
Setup and teardown of tasks¶
Remember from How your program is run that your script is invoked once
per task by Hadoop Streaming. It starts your script, feeds it stdin, reads its
stdout, and closes it. mrjob lets you write methods to run at the beginning and
end of this process: the *_init()
and *_final()
methods:
(And the corresponding keyword arguments to MRStep
.)
If you need to load some kind of support file, like a sqlite3
database, or perhaps create a temporary file, you can use these methods to do
so. (See File options for an example.)
*_init()
and *_final()
methods can yield values just like
normal tasks. Here is our word frequency count example rewritten to use
these methods:
from mrjob.job import MRJob
from mrjob.step import MRStep
class MRWordFreqCount(MRJob):
def init_get_words(self):
self.words = {}
def get_words(self, _, line):
for word in WORD_RE.findall(line):
word = word.lower()
self.words.setdefault(word, 0)
self.words[word] = self.words[word] + 1
def final_get_words(self):
for word, val in self.words.iteritems():
yield word, val
def sum_words(self, word, counts):
yield word, sum(counts)
def steps(self):
return [MRStep(mapper_init=self.init_get_words,
mapper=self.get_words,
mapper_final=self.final_get_words,
combiner=self.sum_words,
reducer=self.sum_words)]
In this version, instead of yielding one line per word, the mapper keeps an
internal count of word occurrences across all lines this mapper has seen so
far. The mapper itself yields nothing. When Hadoop Streaming stops sending data
to the map task, mrjob calls final_get_words()
. That function emits
the totals for this task, which is a much smaller set of output lines than the
mapper would have output.
The optimization above is similar to using combiners, demonstrated in Multi-step jobs. It is usually clearer to use a combiner rather than a custom data structure, and Hadoop may run combiners in more places than just the ends of tasks.
Defining command line options has a partial example that shows how to load a
sqlite3
database using mapper_init()
.
Shell commands as steps¶
You can forego scripts entirely for a step by specifying it as a shell command.
To do so, use mapper_cmd
, combiner_cmd
, or reducer_cmd
as arguments
to MRStep
, or override the methods of the same names on
MRJob
. (See mapper_cmd()
,
combiner_cmd()
, and
reducer_cmd()
.)
Warning
The default inline
runner does not support *_cmd()
. If you
want to test locally, use the local
runner (-r local
).
You may mix command and script steps at will. This job will count the number of lines containing the string “kitty”:
from mrjob.job import job
class KittyJob(MRJob):
OUTPUT_PROTOCOL = JSONValueProtocol
def mapper_cmd(self):
return "grep kitty"
def reducer(self, key, values):
yield None, sum(1 for _ in values)
if __name__ == '__main__':
KittyJob.run()
Step commands are run without a shell, so if you want to use pipes, etc, you’ll need to run them in a subshell. For example:
class DemoJob(MRJob):
def mapper_cmd(self):
return 'sh -c "grep 'blah' | wc -l"'
Note
You may not use *_cmd()
with any other options for a task such as
*_filter()
, *_init()
, *_final()
, or a regular
mapper/combiner/reducer function.
Note
You might see an opportunity here to write your MapReduce code in whatever language you please. If that appeals to you, check out upload_files for another piece of the puzzle.
Filtering task input with shell commands¶
You can specify a command to filter a task’s input before it reaches your task
using the mapper_pre_filter
and reducer_pre_filter
arguments to
MRStep
, or override the methods of the same names on
MRJob
. Doing so will cause mrjob to pipe input through
that command before it reaches your mapper.
Warning
The default inline
runner does not support *_pre_filter()
. If
you want to test locally, use the local
runner (-r local
).
Here’s a job that tests filters using grep:
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from mrjob.step import MRStep
class KittiesJob(MRJob):
OUTPUT_PROTOCOL = JSONValueProtocol
def test_for_kitty(self, _, value):
yield None, 0 # make sure we have some output
if 'kitty' not in value:
yield None, 1
def sum_missing_kitties(self, _, values):
yield None, sum(values)
def steps(self):
return [
MRStep(mapper_pre_filter='grep "kitty"',
mapper=self.test_for_kitty,
reducer=self.sum_missing_kitties)]
if __name__ == '__main__':
KittiesJob.run()
The output of the job should always be 0
, since every line that gets to
test_for_kitty()
is filtered by grep to have “kitty” in
it.
Protocols¶
Hadoop streaming assumes that all data is newline-delimited bytes. By default, mrjob assumes all output is in JSON format, but it can actually read and write lines in any format by using protocols.
(If you need to read non-line-based data, see Passing entire files to the mapper, below.)
Each job has an input protocol, an output protocol, and an internal protocol.
A protocol has a read()
method and a write()
method. The
read()
method converts bytes to pairs of Python objects representing
the keys and values. The write()
method converts a pair of Python
objects back to bytes.
The input protocol is used to read the bytes sent to the first mapper (or reducer, if your first step doesn’t use a mapper). The output protocol is used to write the output of the last step to bytes written to the output file. The internal protocol converts the output of one step to the input of the next if the job has more than one step.
You can specify which protocols your job uses like this:
class MyMRJob(mrjob.job.MRJob):
# these are the defaults
INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
INTERNAL_PROTOCOL = mrjob.protocol.JSONProtocol
OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol
The default input protocol is RawValueProtocol
, which just reads in a line
as a str
. (The line won’t have a trailing newline character because
MRJob
strips it.) So by default, the first step in your
job sees (None, line)
for each line of input [1].
The default output and internal protocols are both JSONProtocol
[2],
which reads and writes JSON strings separated by a tab character. (By default,
Hadoop Streaming uses the tab character to separate keys and values within one
line when it sorts your data.)
If your head hurts a bit, think of it this way: use RawValueProtocol
when you
want to read or write lines of raw text. Use JSONProtocol
when you want to
read or write key-value pairs where the key and value are JSON-enoded bytes.
Note
Hadoop Streaming does not understand JSON, or mrjob protocols. It simply groups lines by doing a string comparison on whatever comes before the first tab character.
See mrjob.protocol
for the full list of protocols built-in to mrjob.
Footnotes
[1] | Experienced Pythonistas might notice that a str is a bytestring
on Python 2, but Unicode on Python 3. That’s right! RawValueProtocol is
an alias for one of two different protocols depending on your Python
version. |
[2] | JSONProtocol is an alias for one of four different
implementations; we try to use the (much faster) ujson library
if it is available, and if not, rapidjson or simplejson
before falling back to the built-in json implementation. |
Data flow walkthrough by example¶
Let’s revisit our example from Multi-step jobs. It has two steps and takes a plain text file as input.
class MRMostUsedWord(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(reducer=self.reducer_find_max_word)
]
The first step starts with mapper_get_words()
:
def mapper_get_words(self, _, line):
# yield each word in the line
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
Since the input protocol is RawValueProtocol
, the key will always be None
and the value will be the text of the line.
The function discards the key and yields (word, 1)
for each word in the
line. Since the internal protocol is JSONProtocol
, each component of the
output is serialized to JSON. The serialized components are written to stdout
separated by a tab character and ending in a newline character, like this:
"mrjob" 1
"is" 1
"a" 1
"python" 1
The next two parts of the step are the combiner and reducer:
def combiner_count_words(self, word, counts):
# sum the words we've seen so far
yield (word, sum(counts))
def reducer_count_words(self, word, counts):
# send all (num_occurrences, word) pairs to the same reducer.
# num_occurrences is so we can easily use Python's max() function.
yield None, (sum(counts), word)
In both cases, bytes are deserialized into (word, counts)
by
JSONProtocol
, and the output is serialized as JSON in the same way (because
both are followed by another step). It looks just like the first mapper output,
but the results are summed:
"mrjob" 31
"is" 2
"a" 2
"Python" 1
The final step is just a reducer:
# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
# each item of word_count_pairs is (count, word),
# so yielding one results in key=counts, value=word
yield max(word_count_pairs)
Since all input to this step has the same key (None
), a single task will
get all rows. Again, JSONProtocol
will handle deserialization and produce the
arguments to reducer_find_max_word()
.
The output protocol is also JSONProtocol
, so the final output will be:
31 "mrjob"
And we’re done! But that’s a bit ugly; there’s no need to write the key out at
all. Let’s use JSONValueProtocol
instead, so we
only see the JSON-encoded value:
class MRMostUsedWord(MRJob):
OUTPUT_PROTOCOL = JSONValueProtocol
Now we should have code that is identical to
examples/mr_most_used_word.py
in mrjob’s source code. Let’s try running
it (-q
prevents debug logging):
$ python mr_most_used_word.py README.txt -q
"mrjob"
Hooray!
Specifying protocols for your job¶
Usually, you’ll just want to set one or more of the class variables
INPUT_PROTOCOL
,
INTERNAL_PROTOCOL
, and
OUTPUT_PROTOCOL
:
class BasicProtocolJob(MRJob):
# get input as raw strings
INPUT_PROTOCOL = RawValueProtocol
# pass data internally with pickle
INTERNAL_PROTOCOL = PickleProtocol
# write output as JSON
OUTPUT_PROTOCOL = JSONProtocol
If you need more complex behavior, you can override
input_protocol()
,
internal_protocol()
, or
output_protocol()
and return a protocol object
instance. Here’s an example that sneaks a peek at Defining command line options:
class CommandLineProtocolJob(MRJob):
def configure_args(self):
super(CommandLineProtocolJob, self).configure_args()
self.add_passthru_arg(
'--output-format', default='raw', choices=['raw', 'json'],
help="Specify the output format of the job")
def output_protocol(self):
if self.options.output_format == 'json':
return JSONValueProtocol()
elif self.options.output_format == 'raw':
return RawValueProtocol()
Finally, if you need to use a completely different concept of protocol
assignment, you can override pick_protocols()
:
class WhatIsThisIDontEvenProtocolJob(MRJob):
def pick_protocols(self, step_num, step_type):
return random.choice([Protocololol, ROFLcol, Trolltocol, Locotorp])
Writing custom protocols¶
A protocol is an object with methods read(self, line)
and write(self,
key, value)
. The read()
method takes a bytestring and returns a 2-tuple
of decoded objects, and write()
takes the key and value and returns bytes
to be passed back to Hadoop Streaming or as output.
Protocols don’t have to worry about adding or stripping newlines; this
is handled automatically by MRJob
.
Here is a simplified version of mrjob’s JSON protocol:
import json
class JSONProtocol(object):
def read(self, line):
k_str, v_str = line.split('\t', 1)
return json.loads(k_str), json.loads(v_str)
def write(self, key, value):
return '%s\t%s' % (json.dumps(key), json.dumps(value))
You can improve performance significantly by caching the
serialization/deserialization results of keys. Look at the source code of
mrjob.protocol
for an example.
Passing entire files to the mapper¶
New in version 0.6.3.
Sometimes you need to read binary data (e.g. image files), or text-based data that has records longer than one line.
By using mapper_raw()
, you can pass entire files
to your mapper, and read them however you want. Each mapper gets one file,
and is passed both the path of a local copy of the file, and the URI where
the original file is located on Hadoop’s filesystem.
For example, if you want to read .wet
files from
Common Crawl data, you could handle them like
this:
class MRCrawler(MRJob):
def mapper_raw(self, wet_path, wet_uri):
from warcio.archiveiterator import ArchiveIterator
with open(wet_path, 'rb') as f:
for record in ArchiveIterator(f):
...
To use a library like warcio
, you’ll need to ensure that it gets
installed on your cluster. See Using a virtualenv for one way to do
this.
Under the hood, mrjob is passes an input manifest (a list of URIs of input files) to Hadoop, and instructs Hadoop to send one line to each mapper. In most cases, this should be seamless, even to the point of telling you which file was being read when a task fails.
Warning
For all runners except EMR, mrjob uses hadoop fs to download files to the local filesystem, which means Hadoop has to invoke itself. If your cluster has tightly tuned memory requirements, this can sometimes cause an out-of-memory error.
Jar steps¶
You can run Java directly on Hadoop (bypassing Hadoop Streaming) by using
JarStep
instead of MRStep()
.
For example, on EMR you can use a jar to run a script:
from mrjob.job import MRJob
from mrjob.step import JarStep
class ScriptyJarJob(MRJob):
def steps(self):
return [JarStep(
jar='s3://elasticmapreduce/libs/script-runner/script-runner.jar',
args=['s3://my_bucket/my_script.sh'])]
More interesting is combining MRStep
and
JarStep
in the same job. Use
mrjob.step.INPUT
and mrjob.step.OUTPUT
in args to stand for the input and output paths
for that step. For example:
class NaiveBayesJob(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper, reducer=self.reducer),
JarStep(
jar='elephant-driver.jar',
args=['naive-bayes', INPUT, OUTPUT]
)
]
Changed in version 0.6.6: mrjob no longer passes hadoop generic args (-D and -libjars) to
JarStep
s. If you want them, add mrjob.step.GENERIC_ARGS
to your JarStep
‘s args, and mrjob will automatically interpolate
them.
JarStep
has no concept of Protocols. If your
jar reads input from a MRStep
, or writes input
read by another MRStep
, it is up to those
steps to read and write data in the format your jar expects.
If you are writing the jar yourself, the easiest solution is to have it read and write mrjob’s default protocol (lines containing two JSONs, separated by a tab).
If you are using a third-party jar, you can set custom protocols for the steps
before and after it by overriding pick_protocols()
.
Warning
If the first step of your job is a JarStep
and you
pass in multiple input paths, mrjob will replace
INPUT
with the input paths joined together with a
comma. Not all jars can handle this!
Best practice in this case is to put all your input into a single directory and pass that as your input path.
Using other python modules and packages¶
New in version 0.6.4.
If you want to run Python code outside of the file containing your
MRJob
, you’ll to make sure that code gets uploaded to
Hadoop.
The easiest way to do this is with by setting the
DIRS
attribute in your job. Put the code you
want to import in one or more packages (directories with an
__init__.py
file), and point DIRS
at them:
class MRPackageUsingJob(MRJob):
DIRS = ['mycode', '../someothercode']
...
And then import code from inside a mapper or reducer:
def mapper(self, key, value):
from mycode.custom import important_business_logic
from someotherlibrary import util_function
...
(If you want to import code from the top level of your script rather than
inside a method, make sure it’s in your PYTHONPATH
, just like with
any other code.)
DIRS
is relative to the directory your script is
in (not the current working directory). This works inside Hadoop because the
current working directory is the same as the directory your script is in.
If you want to access individual Python modules or other support code, you
can use FILES
to upload them to your job’s working
directory inside Hadoop:
class MRFileUsingJob(MRJob):
FILES = ['mymodule.py', '../data/zipcodes.db']
def mapper(self, key, value):
from mymodule import open_zipcode_db
with open_zipcode_db('zipcodes.db') as db:
...
For jobs with more complex dependencies (e.g. code that needs to be compiled), you may need to use the setup option. See Job Environment Setup Cookbook for more information.
Defining command line options¶
Recall from How your program is run that your script is executed in
several contexts: once for the initial invocation, and once for each task. If
you just add an option to your job’s option parser, that option’s value won’t
be propagated to other runs of your script. Instead, you can use mrjob’s option
API: add_passthru_arg()
and
add_file_arg()
.
Passthrough options¶
A passthrough option is an argparse
option that mrjob is aware
of. mrjob inspects the value of the option when you invoke your script
and reproduces that value when it invokes your script in other contexts. The
command line-switchable protocol example from before uses this feature:
class CommandLineProtocolJob(MRJob):
def configure_args(self):
super(CommandLineProtocolJob, self).configure_args()
self.add_passthru_arg(
'--output-format', default='raw', choices=['raw', 'json'],
help="Specify the output format of the job")
def output_protocol(self):
if self.options.output_format == 'json':
return JSONValueProtocol()
elif self.options.output_format == 'raw':
return RawValueProtocol()
When you run your script with --output-format=json
, mrjob detects that you
passed --output-format
on the command line. When your script is run in any
other context, such as on Hadoop, it adds --output-format=json
to its
command string.
add_passthru_arg()
takes the same arguments as
argparse.ArgumentParser.add_argument()
. For more information, see the
argparse docs.
Passing through existing options¶
Occasionally, it’ll be useful for mappers, reducers, etc. to be able to see
the value of other command-line options. For this, use
pass_arg_through()
with the corresponding
command-line switch.
For example, you might wish to fetch supporting data for your job from different locations, depending on whether your job is running on EMR or locally:
class MRRunnerAwareJob(MRJob):
def configure_args(self):
super(MRRunnerAwareJob, self).configure_args()
self.pass_arg_through('--runner')
def mapper_init(self):
if self.options.runner == 'emr':
self.data = ... # load from S3
else:
self.data = ... # load from local FS
Note
Keep in mind that self.options.runner
(and the values of most options)
will be None
unless the user explicitly set them with a command-line
switch.
File options¶
A file option is like a passthrough option, but:
- Its value must be a string or list of strings (
action="store"
oraction="append"
), where each string represents either a local path, or an HDFS or S3 path that will be accessible from the task nodes. - That file will be downloaded to each task’s local directory and the value of the option will magically be changed to its path.
For example, if you had a map task that required a sqlite3
database,
you could do this:
class SqliteJob(MRJob):
def configure_args(self):
super(SqliteJob, self).configure_args()
self.add_file_arg('--database')
def mapper_init(self):
# make sqlite3 database available to mapper
self.sqlite_conn = sqlite3.connect(self.options.database)
You could call it any of these ways, depending on where the file is:
$ python sqlite_job.py -r local --database=/etc/my_db.sqlite3
$ python sqlite_job.py -r hadoop --database=/etc/my_db.sqlite3
$ python sqlite_job.py -r hadoop --database=hdfs://my_dir/my_db.sqlite3
$ python sqlite_job.py -r emr --database=/etc/my_db.sqlite3
$ python sqlite_job.py -r emr --database=s3://my_bucket/my_db.sqlite3
In any of these cases, when your task runs, my_db.sqlite3
will always
be available in the task’s working directory, and the value of
self.options.database
will always be set to its path.
See Making files available to tasks if you want to upload a file to your tasks’ working directories without writing a custom command line option.
Warning
You must wait to read files until after class initialization. That means you should use the *_init() methods to read files. Trying to read files into class variables will not work.
Counters¶
Hadoop lets you track counters that are aggregated over a step. A counter has a group, a name, and an integer value. Hadoop itself tracks a few counters automatically. mrjob prints your job’s counters to the command line when your job finishes, and they are available to the runner object if you invoke it programmatically.
To increment a counter from anywhere in your job, use the
increment_counter()
method:
class MRCountingJob(MRJob):
def mapper(self, _, value):
self.increment_counter('group', 'counter_name', 1)
yield _, value
At the end of your job, you’ll get the counter’s total value:
group:
counter_name: 1
Input and output formats¶
Input and output formats are Java classes that determine how your job interfaces with data on Hadoop’s filesystem(s).
Suppose we wanted to write a word frequency count job that wrote output
into a separate directory based on the first letter of the word counted
(a/part-*
, b/part-*
, etc.). We
could accomplish this by using the MultipleValueOutputFormat
class
from the Open Source project
nicknack.
First, we need to tell our job to use the custom output format by setting
HADOOP_OUTPUT_FORMAT
in our job class:
HADOOP_OUTPUT_FORMAT = 'nicknack.MultipleValueOutputFormat'
The output format class is part of a custom JAR, so we need to make sure that
this JAR gets included in Hadoop’s classpath. First
download
the jar to the same directory as your script, and add its name to
LIBJARS
:
LIBJARS = ['nicknack-1.0.0.jar']
(You can skip this step if you’re using a format class that’s built into Hadoop.)
Finally, output your data the way that your output format expects.
MultipleValueOutputFormat
expects the subdirectory name, followed by
a tab, followed the actual line to write into the file.
First, we need to take direct control of how the job writes output by
setting
OUTPUT_PROTOCOL
to
RawValueProtocol
:
OUTPUT_PROTOCOL = RawValueProtocol
Then we need to format the line accordingly. In this case, let’s continue output our final data in the standard format (two JSONs separated by a tab):
def reducer(self, word, counts):
total = sum(counts)
yield None, '\t'.join([word[0], json.dumps(word), json.dumps(total)])
Done! Here’s the full, working job (this is
mrjob.examples.mr_nick_nack
):
import json
import re
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol
WORD_RE = re.compile(r"[A-Za-z]+")
class MRNickNack(MRJob):
HADOOP_OUTPUT_FORMAT = 'nicknack.MultipleValueOutputFormat'
LIBJARS = ['nicknack-1.0.0.jar']
OUTPUT_PROTOCOL = RawValueProtocol
def mapper(self, _, line):
for word in WORD_RE.findall(line):
yield (word.lower(), 1)
def reducer(self, word, counts):
total = sum(counts)
yield None, '\t'.join([word[0], json.dumps(word), json.dumps(total)])
if __name__ == '__main__':
MRNickNack.run()
Input formats work the same way; just set
HADOOP_INPUT_FORMAT
. (You usually won’t need to set
INPUT_PROTOCOL
because it already defaults to
RawValueProtocol
.)