Writing jobs

This guide covers everything you need to know to write your job. You’ll probably need to flip between this guide and Runners to find all the information you need.

Defining steps

Your job will be defined in a file to be executed on your machine as a Python script, as well as on a Hadoop cluster as an individual map, combine, or reduce task. (See How your program is run for more on that.)

All dependencies must either be contained within the file, available on the task nodes, or uploaded to the cluster by mrjob when your job is submitted. (Runners explains how to do those things.)

The following two sections are more reference-oriented versions of Writing your first job and Writing your second job.

Single-step jobs

The simplest way to write a one-step job is to subclass MRJob and override a few methods:

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)


if __name__ == '__main__':
    MRWordFreqCount.run()

(See Writing your first job for an explanation of this example.)

Here are all the methods you can override to write a one-step job. We’ll explain them over the course of this document.

Multi-step jobs

To define multiple steps, override steps() to return a list of MRSteps:

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]


if __name__ == '__main__':
    MRMostUsedWord.run()

(This example is explained further in Protocols.)

The keyword arguments accepted by MRStep are the same as the method names listed in the previous section, plus a jobconf argument which takes a dictionary of jobconf arguments to pass to Hadoop.

Note

If this is your first time learning about mrjob, you should skip down to Protocols and finish this section later.

Setup and teardown of tasks

Remember from How your program is run that your script is invoked once per task by Hadoop Streaming. It starts your script, feeds it stdin, reads its stdout, and closes it. mrjob lets you write methods to run at the beginning and end of this process: the *_init() and *_final() methods:

(And the corresponding keyword arguments to MRStep.)

If you need to load some kind of support file, like a sqlite3 database, or perhaps create a temporary file, you can use these methods to do so. (See File options for an example.)

*_init() and *_final() methods can yield values just like normal tasks. Here is our word frequency count example rewritten to use these methods:

from mrjob.job import MRJob
from mrjob.step import MRStep

class MRWordFreqCount(MRJob):

    def init_get_words(self):
        self.words = {}

    def get_words(self, _, line):
        for word in WORD_RE.findall(line):
            word = word.lower()
            self.words.setdefault(word, 0)
            self.words[word] = self.words[word] + 1

    def final_get_words(self):
        for word, val in self.words.iteritems():
            yield word, val

    def sum_words(self, word, counts):
        yield word, sum(counts)

    def steps(self):
        return [MRStep(mapper_init=self.init_get_words,
                       mapper=self.get_words,
                       mapper_final=self.final_get_words,
                       combiner=self.sum_words,
                       reducer=self.sum_words)]

In this version, instead of yielding one line per word, the mapper keeps an internal count of word occurrences across all lines this mapper has seen so far. The mapper itself yields nothing. When Hadoop Streaming stops sending data to the map task, mrjob calls final_get_words(). That function emits the totals for this task, which is a much smaller set of output lines than the mapper would have output.

The optimization above is similar to using combiners, demonstrated in Multi-step jobs. It is usually clearer to use a combiner rather than a custom data structure, and Hadoop may run combiners in more places than just the ends of tasks.

Defining command line options has a partial example that shows how to load a sqlite3 database using mapper_init().

Shell commands as steps

You can forego scripts entirely for a step by specifying it as a shell command. To do so, use mapper_cmd, combiner_cmd, or reducer_cmd as arguments to MRStep, or override the methods of the same names on MRJob. (See mapper_cmd(), combiner_cmd(), and reducer_cmd().)

Warning

The default inline runner does not support *_cmd(). If you want to test locally, use the local runner (-r local).

You may mix command and script steps at will. This job will count the number of lines containing the string “kitty”:

from mrjob.job import job


class KittyJob(MRJob):

    OUTPUT_PROTOCOL = JSONValueProtocol

    def mapper_cmd(self):
        return "grep kitty"

    def reducer(self, key, values):
        yield None, sum(1 for _ in values)


if __name__ == '__main__':
    KittyJob.run()

Step commands are run without a shell, so if you want to use pipes, etc, you’ll need to run them in a subshell. For example:

class DemoJob(MRJob):

    def mapper_cmd(self):
        return 'sh -c "grep 'blah' | wc -l"'

Note

You may not use *_cmd() with any other options for a task such as *_filter(), *_init(), *_final(), or a regular mapper/combiner/reducer function.

Note

You might see an opportunity here to write your MapReduce code in whatever language you please. If that appeals to you, check out upload_files for another piece of the puzzle.

Filtering task input with shell commands

You can specify a command to filter a task’s input before it reaches your task using the mapper_pre_filter and reducer_pre_filter arguments to MRStep, or override the methods of the same names on MRJob. Doing so will cause mrjob to pipe input through that command before it reaches your mapper.

Warning

The default inline runner does not support *_pre_filter(). If you want to test locally, use the local runner (-r local).

Here’s a job that tests filters using grep:

from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from mrjob.step import MRStep


class KittiesJob(MRJob):

    OUTPUT_PROTOCOL = JSONValueProtocol

    def test_for_kitty(self, _, value):
        yield None, 0  # make sure we have some output
        if 'kitty' not in value:
            yield None, 1

    def sum_missing_kitties(self, _, values):
        yield None, sum(values)

    def steps(self):
        return [
            MRStep(mapper_pre_filter='grep "kitty"',
                   mapper=self.test_for_kitty,
                   reducer=self.sum_missing_kitties)]


if __name__ == '__main__':
    KittiesJob.run()

The output of the job should always be 0, since every line that gets to test_for_kitty() is filtered by grep to have “kitty” in it.

Protocols

Hadoop streaming assumes that all data is newline-delimited bytes. By default, mrjob assumes all output is in JSON format, but it can actually read and write lines in any format by using protocols.

(If you need to read non-line-based data, see Passing entire files to the mapper, below.)

Each job has an input protocol, an output protocol, and an internal protocol.

A protocol has a read() method and a write() method. The read() method converts bytes to pairs of Python objects representing the keys and values. The write() method converts a pair of Python objects back to bytes.

The input protocol is used to read the bytes sent to the first mapper (or reducer, if your first step doesn’t use a mapper). The output protocol is used to write the output of the last step to bytes written to the output file. The internal protocol converts the output of one step to the input of the next if the job has more than one step.

You can specify which protocols your job uses like this:

class MyMRJob(mrjob.job.MRJob):

    # these are the defaults
    INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
    INTERNAL_PROTOCOL = mrjob.protocol.JSONProtocol
    OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol

The default input protocol is RawValueProtocol, which just reads in a line as a str. (The line won’t have a trailing newline character because MRJob strips it.) So by default, the first step in your job sees (None, line) for each line of input [1].

The default output and internal protocols are both JSONProtocol [2], which reads and writes JSON strings separated by a tab character. (By default, Hadoop Streaming uses the tab character to separate keys and values within one line when it sorts your data.)

If your head hurts a bit, think of it this way: use RawValueProtocol when you want to read or write lines of raw text. Use JSONProtocol when you want to read or write key-value pairs where the key and value are JSON-enoded bytes.

Note

Hadoop Streaming does not understand JSON, or mrjob protocols. It simply groups lines by doing a string comparison on whatever comes before the first tab character.

See mrjob.protocol for the full list of protocols built-in to mrjob.

Footnotes

[1]Experienced Pythonistas might notice that a str is a bytestring on Python 2, but Unicode on Python 3. That’s right! RawValueProtocol is an alias for one of two different protocols depending on your Python version.
[2]JSONProtocol is an alias for one of four different implementations; we try to use the (much faster) ujson library if it is available, and if not, rapidjson or simplejson before falling back to the built-in json implementation.

Data flow walkthrough by example

Let’s revisit our example from Multi-step jobs. It has two steps and takes a plain text file as input.

class MRMostUsedWord(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

The first step starts with mapper_get_words():

def mapper_get_words(self, _, line):
    # yield each word in the line
    for word in WORD_RE.findall(line):
        yield (word.lower(), 1)

Since the input protocol is RawValueProtocol, the key will always be None and the value will be the text of the line.

The function discards the key and yields (word, 1) for each word in the line. Since the internal protocol is JSONProtocol, each component of the output is serialized to JSON. The serialized components are written to stdout separated by a tab character and ending in a newline character, like this:

"mrjob" 1
"is"    1
"a" 1
"python"    1

The next two parts of the step are the combiner and reducer:

def combiner_count_words(self, word, counts):
    # sum the words we've seen so far
    yield (word, sum(counts))

def reducer_count_words(self, word, counts):
    # send all (num_occurrences, word) pairs to the same reducer.
    # num_occurrences is so we can easily use Python's max() function.
    yield None, (sum(counts), word)

In both cases, bytes are deserialized into (word, counts) by JSONProtocol, and the output is serialized as JSON in the same way (because both are followed by another step). It looks just like the first mapper output, but the results are summed:

"mrjob" 31
"is"    2
"a" 2
"Python"    1

The final step is just a reducer:

# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
    # each item of word_count_pairs is (count, word),
    # so yielding one results in key=counts, value=word
    yield max(word_count_pairs)

Since all input to this step has the same key (None), a single task will get all rows. Again, JSONProtocol will handle deserialization and produce the arguments to reducer_find_max_word().

The output protocol is also JSONProtocol, so the final output will be:

31  "mrjob"

And we’re done! But that’s a bit ugly; there’s no need to write the key out at all. Let’s use JSONValueProtocol instead, so we only see the JSON-encoded value:

class MRMostUsedWord(MRJob):

    OUTPUT_PROTOCOL = JSONValueProtocol

Now we should have code that is identical to examples/mr_most_used_word.py in mrjob’s source code. Let’s try running it (-q prevents debug logging):

$ python mr_most_used_word.py README.txt -q
"mrjob"

Hooray!

Specifying protocols for your job

Usually, you’ll just want to set one or more of the class variables INPUT_PROTOCOL, INTERNAL_PROTOCOL, and OUTPUT_PROTOCOL:

class BasicProtocolJob(MRJob):

    # get input as raw strings
    INPUT_PROTOCOL = RawValueProtocol
    # pass data internally with pickle
    INTERNAL_PROTOCOL = PickleProtocol
    # write output as JSON
    OUTPUT_PROTOCOL = JSONProtocol

If you need more complex behavior, you can override input_protocol(), internal_protocol(), or output_protocol() and return a protocol object instance. Here’s an example that sneaks a peek at Defining command line options:

class CommandLineProtocolJob(MRJob):

    def configure_args(self):
        super(CommandLineProtocolJob, self).configure_args()
        self.add_passthru_arg(
            '--output-format', default='raw', choices=['raw', 'json'],
            help="Specify the output format of the job")

    def output_protocol(self):
        if self.options.output_format == 'json':
            return JSONValueProtocol()
        elif self.options.output_format == 'raw':
            return RawValueProtocol()

Finally, if you need to use a completely different concept of protocol assignment, you can override pick_protocols():

class WhatIsThisIDontEvenProtocolJob(MRJob):

    def pick_protocols(self, step_num, step_type):
        return random.choice([Protocololol, ROFLcol, Trolltocol, Locotorp])

Writing custom protocols

A protocol is an object with methods read(self, line) and write(self, key, value). The read() method takes a bytestring and returns a 2-tuple of decoded objects, and write() takes the key and value and returns bytes to be passed back to Hadoop Streaming or as output.

Protocols don’t have to worry about adding or stripping newlines; this is handled automatically by MRJob.

Here is a simplified version of mrjob’s JSON protocol:

import json


class JSONProtocol(object):

    def read(self, line):
        k_str, v_str = line.split('\t', 1)
        return json.loads(k_str), json.loads(v_str)

    def write(self, key, value):
        return '%s\t%s' % (json.dumps(key), json.dumps(value))

You can improve performance significantly by caching the serialization/deserialization results of keys. Look at the source code of mrjob.protocol for an example.

Passing entire files to the mapper

New in version 0.6.3.

Sometimes you need to read binary data (e.g. image files), or text-based data that has records longer than one line.

By using mapper_raw(), you can pass entire files to your mapper, and read them however you want. Each mapper gets one file, and is passed both the path of a local copy of the file, and the URI where the original file is located on Hadoop’s filesystem.

For example, if you want to read .wet files from Common Crawl data, you could handle them like this:

class MRCrawler(MRJob):

    def mapper_raw(self, wet_path, wet_uri):
        from warcio.archiveiterator import ArchiveIterator

        with open(wet_path, 'rb') as f:
            for record in ArchiveIterator(f):
                ...

To use a library like warcio, you’ll need to ensure that it gets installed on your cluster. See Using a virtualenv for one way to do this.

Under the hood, mrjob is passes an input manifest (a list of URIs of input files) to Hadoop, and instructs Hadoop to send one line to each mapper. In most cases, this should be seamless, even to the point of telling you which file was being read when a task fails.

Warning

For all runners except EMR, mrjob uses hadoop fs to download files to the local filesystem, which means Hadoop has to invoke itself. If your cluster has tightly tuned memory requirements, this can sometimes cause an out-of-memory error.

Jar steps

You can run Java directly on Hadoop (bypassing Hadoop Streaming) by using JarStep instead of MRStep().

For example, on EMR you can use a jar to run a script:

from mrjob.job import MRJob
from mrjob.step import JarStep

class ScriptyJarJob(MRJob):

    def steps(self):
        return [JarStep(
            jar='s3://elasticmapreduce/libs/script-runner/script-runner.jar',
            args=['s3://my_bucket/my_script.sh'])]

More interesting is combining MRStep and JarStep in the same job. Use mrjob.step.INPUT and mrjob.step.OUTPUT in args to stand for the input and output paths for that step. For example:

class NaiveBayesJob(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer),
            JarStep(
                jar='elephant-driver.jar',
                args=['naive-bayes', INPUT, OUTPUT]
            )
        ]

Changed in version 0.6.6: mrjob no longer passes hadoop generic args (-D and -libjars) to JarSteps. If you want them, add mrjob.step.GENERIC_ARGS to your JarStep‘s args, and mrjob will automatically interpolate them.

JarStep has no concept of Protocols. If your jar reads input from a MRStep, or writes input read by another MRStep, it is up to those steps to read and write data in the format your jar expects.

If you are writing the jar yourself, the easiest solution is to have it read and write mrjob’s default protocol (lines containing two JSONs, separated by a tab).

If you are using a third-party jar, you can set custom protocols for the steps before and after it by overriding pick_protocols().

Warning

If the first step of your job is a JarStep and you pass in multiple input paths, mrjob will replace INPUT with the input paths joined together with a comma. Not all jars can handle this!

Best practice in this case is to put all your input into a single directory and pass that as your input path.

Using other python modules and packages

New in version 0.6.4.

If you want to run Python code outside of the file containing your MRJob, you’ll to make sure that code gets uploaded to Hadoop.

The easiest way to do this is with by setting the DIRS attribute in your job. Put the code you want to import in one or more packages (directories with an __init__.py file), and point DIRS at them:

class MRPackageUsingJob(MRJob):

  DIRS = ['mycode', '../someothercode']

  ...

And then import code from inside a mapper or reducer:

def mapper(self, key, value):
  from mycode.custom import important_business_logic
  from someotherlibrary import util_function
  ...

(If you want to import code from the top level of your script rather than inside a method, make sure it’s in your PYTHONPATH, just like with any other code.)

DIRS is relative to the directory your script is in (not the current working directory). This works inside Hadoop because the current working directory is the same as the directory your script is in.

If you want to access individual Python modules or other support code, you can use FILES to upload them to your job’s working directory inside Hadoop:

class MRFileUsingJob(MRJob):

  FILES = ['mymodule.py', '../data/zipcodes.db']

  def mapper(self, key, value):
    from mymodule import open_zipcode_db
    with open_zipcode_db('zipcodes.db') as db:
      ...

For jobs with more complex dependencies (e.g. code that needs to be compiled), you may need to use the setup option. See Job Environment Setup Cookbook for more information.

Defining command line options

Recall from How your program is run that your script is executed in several contexts: once for the initial invocation, and once for each task. If you just add an option to your job’s option parser, that option’s value won’t be propagated to other runs of your script. Instead, you can use mrjob’s option API: add_passthru_arg() and add_file_arg().

Passthrough options

A passthrough option is an argparse option that mrjob is aware of. mrjob inspects the value of the option when you invoke your script and reproduces that value when it invokes your script in other contexts. The command line-switchable protocol example from before uses this feature:

class CommandLineProtocolJob(MRJob):

    def configure_args(self):
        super(CommandLineProtocolJob, self).configure_args()
        self.add_passthru_arg(
            '--output-format', default='raw', choices=['raw', 'json'],
            help="Specify the output format of the job")

    def output_protocol(self):
        if self.options.output_format == 'json':
            return JSONValueProtocol()
        elif self.options.output_format == 'raw':
            return RawValueProtocol()

When you run your script with --output-format=json, mrjob detects that you passed --output-format on the command line. When your script is run in any other context, such as on Hadoop, it adds --output-format=json to its command string.

add_passthru_arg() takes the same arguments as argparse.ArgumentParser.add_argument(). For more information, see the argparse docs.

Passing through existing options

Occasionally, it’ll be useful for mappers, reducers, etc. to be able to see the value of other command-line options. For this, use pass_arg_through() with the corresponding command-line switch.

For example, you might wish to fetch supporting data for your job from different locations, depending on whether your job is running on EMR or locally:

class MRRunnerAwareJob(MRJob):

    def configure_args(self):
        super(MRRunnerAwareJob, self).configure_args()

        self.pass_arg_through('--runner')

    def mapper_init(self):
        if self.options.runner == 'emr':
            self.data = ...  # load from S3
        else:
            self.data = ... # load from local FS

Note

Keep in mind that self.options.runner (and the values of most options) will be None unless the user explicitly set them with a command-line switch.

File options

A file option is like a passthrough option, but:

  1. Its value must be a string or list of strings (action="store" or action="append"), where each string represents either a local path, or an HDFS or S3 path that will be accessible from the task nodes.
  2. That file will be downloaded to each task’s local directory and the value of the option will magically be changed to its path.

For example, if you had a map task that required a sqlite3 database, you could do this:

class SqliteJob(MRJob):

    def configure_args(self):
        super(SqliteJob, self).configure_args()
        self.add_file_arg('--database')

    def mapper_init(self):
        # make sqlite3 database available to mapper
        self.sqlite_conn = sqlite3.connect(self.options.database)

You could call it any of these ways, depending on where the file is:

$ python sqlite_job.py -r local  --database=/etc/my_db.sqlite3
$ python sqlite_job.py -r hadoop --database=/etc/my_db.sqlite3
$ python sqlite_job.py -r hadoop --database=hdfs://my_dir/my_db.sqlite3
$ python sqlite_job.py -r emr    --database=/etc/my_db.sqlite3
$ python sqlite_job.py -r emr    --database=s3://my_bucket/my_db.sqlite3

In any of these cases, when your task runs, my_db.sqlite3 will always be available in the task’s working directory, and the value of self.options.database will always be set to its path.

See Making files available to tasks if you want to upload a file to your tasks’ working directories without writing a custom command line option.

Warning

You must wait to read files until after class initialization. That means you should use the *_init() methods to read files. Trying to read files into class variables will not work.

Counters

Hadoop lets you track counters that are aggregated over a step. A counter has a group, a name, and an integer value. Hadoop itself tracks a few counters automatically. mrjob prints your job’s counters to the command line when your job finishes, and they are available to the runner object if you invoke it programmatically.

To increment a counter from anywhere in your job, use the increment_counter() method:

class MRCountingJob(MRJob):

    def mapper(self, _, value):
        self.increment_counter('group', 'counter_name', 1)
        yield _, value

At the end of your job, you’ll get the counter’s total value:

group:
    counter_name: 1

Input and output formats

Input and output formats are Java classes that determine how your job interfaces with data on Hadoop’s filesystem(s).

Suppose we wanted to write a word frequency count job that wrote output into a separate directory based on the first letter of the word counted (a/part-*, b/part-*, etc.). We could accomplish this by using the MultipleValueOutputFormat class from the Open Source project nicknack.

First, we need to tell our job to use the custom output format by setting HADOOP_OUTPUT_FORMAT in our job class:

HADOOP_OUTPUT_FORMAT = 'nicknack.MultipleValueOutputFormat'

The output format class is part of a custom JAR, so we need to make sure that this JAR gets included in Hadoop’s classpath. First download the jar to the same directory as your script, and add its name to LIBJARS:

LIBJARS = ['nicknack-1.0.0.jar']

(You can skip this step if you’re using a format class that’s built into Hadoop.)

Finally, output your data the way that your output format expects. MultipleValueOutputFormat expects the subdirectory name, followed by a tab, followed the actual line to write into the file.

First, we need to take direct control of how the job writes output by setting OUTPUT_PROTOCOL to RawValueProtocol:

OUTPUT_PROTOCOL = RawValueProtocol

Then we need to format the line accordingly. In this case, let’s continue output our final data in the standard format (two JSONs separated by a tab):

def reducer(self, word, counts):
    total = sum(counts)
    yield None, '\t'.join([word[0], json.dumps(word), json.dumps(total)])

Done! Here’s the full, working job (this is mrjob.examples.mr_nick_nack):

import json
import re

from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol

WORD_RE = re.compile(r"[A-Za-z]+")


class MRNickNack(MRJob):

    HADOOP_OUTPUT_FORMAT = 'nicknack.MultipleValueOutputFormat'

    LIBJARS = ['nicknack-1.0.0.jar']

    OUTPUT_PROTOCOL = RawValueProtocol

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def reducer(self, word, counts):
        total = sum(counts)
        yield None, '\t'.join([word[0], json.dumps(word), json.dumps(total)])


if __name__ == '__main__':
    MRNickNack.run()

Input formats work the same way; just set HADOOP_INPUT_FORMAT. (You usually won’t need to set INPUT_PROTOCOL because it already defaults to RawValueProtocol.)