mrjob.job - defining your job

class mrjob.job.MRJob(args=None)

The base class for all MapReduce jobs. See __init__() for details.

One-step jobs

MRJob.mapper(key, value)

Re-define this to define the mapper for a one-step job.

Yields zero or more tuples of (out_key, out_value).

Parameters:
  • key – A value parsed from input.
  • value – A value parsed from input.

If you don’t re-define this, your job will have a mapper that simply yields (key, value) as-is.

By default (if you don’t mess with Protocols):
  • key will be None
  • value will be the raw input line, with newline stripped.
  • out_key and out_value must be JSON-encodable: numeric, unicode, boolean, None, list, or dict whose keys are unicodes.
MRJob.reducer(key, values)

Re-define this to define the reducer for a one-step job.

Yields one or more tuples of (out_key, out_value)

Parameters:
  • key – A key which was yielded by the mapper
  • value – A generator which yields all values yielded by the mapper which correspond to key.
By default (if you don’t mess with Protocols):
  • out_key and out_value must be JSON-encodable.
  • key and value will have been decoded from JSON (so tuples will become lists).
MRJob.combiner(key, values)

Re-define this to define the combiner for a one-step job.

Yields one or more tuples of (out_key, out_value)

Parameters:
  • key – A key which was yielded by the mapper
  • value – A generator which yields all values yielded by one mapper task/node which correspond to key.
By default (if you don’t mess with Protocols):
  • out_key and out_value must be JSON-encodable.
  • key and value will have been decoded from JSON (so tuples will become lists).
MRJob.mapper_init()

Re-define this to define an action to run before the mapper processes any input.

One use for this function is to initialize mapper-specific helper structures.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.mapper_final()

Re-define this to define an action to run after the mapper reaches the end of input.

One way to use this is to store a total in an instance variable, and output it after reading all input data. See mrjob.examples for an example.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.reducer_init()

Re-define this to define an action to run before the reducer processes any input.

One use for this function is to initialize reducer-specific helper structures.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.reducer_final()

Re-define this to define an action to run after the reducer reaches the end of input.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.combiner_init()

Re-define this to define an action to run before the combiner processes any input.

One use for this function is to initialize combiner-specific helper structures.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.combiner_final()

Re-define this to define an action to run after the combiner reaches the end of input.

Yields one or more tuples of (out_key, out_value).

By default, out_key and out_value must be JSON-encodable; re-define INTERNAL_PROTOCOL to change this.

MRJob.mapper_cmd()

Re-define this to define the mapper for a one-step job as a shell command. If you define your mapper this way, the command will be passed unchanged to Hadoop Streaming, with some minor exceptions. For important specifics, see Shell commands as steps.

Basic example:

def mapper_cmd(self):
    return 'cat'
MRJob.reducer_cmd()

Re-define this to define the reducer for a one-step job as a shell command. If you define your mapper this way, the command will be passed unchanged to Hadoop Streaming, with some minor exceptions. For specifics, see Shell commands as steps.

Basic example:

def reducer_cmd(self):
    return 'cat'
MRJob.combiner_cmd()

Re-define this to define the combiner for a one-step job as a shell command. If you define your mapper this way, the command will be passed unchanged to Hadoop Streaming, with some minor exceptions. For specifics, see Shell commands as steps.

Basic example:

def combiner_cmd(self):
    return 'cat'
MRJob.mapper_pre_filter()

Re-define this to specify a shell command to filter the mapper’s input before it gets to your job’s mapper in a one-step job. For important specifics, see Filtering task input with shell commands.

Basic example:

def mapper_pre_filter(self):
    return 'grep "ponies"'
MRJob.reducer_pre_filter()

Re-define this to specify a shell command to filter the reducer’s input before it gets to your job’s reducer in a one-step job. For important specifics, see Filtering task input with shell commands.

Basic example:

def reducer_pre_filter(self):
    return 'grep "ponies"'
MRJob.combiner_pre_filter()

Re-define this to specify a shell command to filter the combiner’s input before it gets to your job’s combiner in a one-step job. For important specifics, see Filtering task input with shell commands.

Basic example:

def combiner_pre_filter(self):
    return 'grep "ponies"'
MRJob.mapper_raw(input_path, input_uri)

Re-define this to make Hadoop pass one input file to each mapper.

Parameters:
  • input_path – a local path that the input file has been copied to
  • input_uri – the URI of the input file on HDFS, S3, etc

New in version 0.6.3.

MRJob.spark(input_path, output_path)

Re-define this with Spark code to run. You can read input with input_path and output with output_path.

Warning

Prior to v0.6.8, to pass job methods into Spark (rdd.flatMap(self.some_method)), you first had to call self.sandbox(); otherwise Spark would error because self was not serializable.

Multi-step jobs

MRJob.steps()

Re-define this to make a multi-step job.

If you don’t re-define this, we’ll automatically create a one-step job using any of mapper(), mapper_init(), mapper_final(), reducer_init(), reducer_final(), and reducer() that you’ve re-defined. For example:

def steps(self):
    return [MRStep(mapper=self.transform_input,
                   reducer=self.consolidate_1),
            MRStep(reducer_init=self.log_mapper_init,
                   reducer=self.consolidate_2)]
Returns:a list of steps constructed with MRStep or other classes in mrjob.step.

Running the job

classmethod MRJob.run()

Entry point for running job from the command-line.

This is also the entry point when a mapper or reducer is run by Hadoop Streaming.

Does one of:

MRJob.__init__(args=None)

Entry point for running your job from other Python code.

You can pass in command-line arguments, and the job will act the same way it would if it were run from the command line. For example, to run your job on EMR:

mr_job = MRYourJob(args=['-r', 'emr'])
with mr_job.make_runner() as runner:
    ...

Passing in None is the same as passing in sys.argv[1:]

For a full list of command-line arguments, run: python -m mrjob.job --help

Parameters:args – Arguments to your script (switches and input files)

Changed in version 0.7.0: Previously, args set to None was equivalent to [].

MRJob.make_runner()

Make a runner based on command-line arguments, so we can launch this job on EMR, on Hadoop, or locally.

Return type:mrjob.runner.MRJobRunner

Parsing output

MRJob.parse_output(chunks)

Parse the final output of this MRJob (as a stream of byte chunks) into a stream of (key, value).

Counters and status messages

MRJob.increment_counter(group, counter, amount=1)

Increment a counter in Hadoop streaming by printing to stderr.

Parameters:
  • group (str) – counter group
  • counter (str) – description of the counter
  • amount (int) – how much to increment the counter by

Commas in counter or group will be automatically replaced with semicolons (commas confuse Hadoop streaming).

MRJob.set_status(msg)

Set the job status in hadoop streaming by printing to stderr.

This is also a good way of doing a keepalive for a job that goes a long time between outputs; Hadoop streaming usually times out jobs that give no output for longer than 10 minutes.

Setting protocols

MRJob.INPUT_PROTOCOL = <class 'mrjob.protocol.BytesValueProtocol'>

Protocol for reading input to the first mapper in your job. Default: RawValueProtocol.

For example you know your input data were in JSON format, you could set:

INPUT_PROTOCOL = JSONValueProtocol

in your class, and your initial mapper would receive decoded JSONs rather than strings.

See mrjob.protocol for the full list of protocols.

MRJob.INTERNAL_PROTOCOL = <class 'mrjob.protocol.StandardJSONProtocol'>

Protocol for communication between steps and final output. Default: JSONProtocol.

For example if your step output weren’t JSON-encodable, you could set:

INTERNAL_PROTOCOL = PickleProtocol

and step output would be encoded as string-escaped pickles.

See mrjob.protocol for the full list of protocols.

MRJob.OUTPUT_PROTOCOL = <class 'mrjob.protocol.StandardJSONProtocol'>

Protocol to use for writing output. Default: JSONProtocol.

For example, if you wanted the final output in repr, you could set:

OUTPUT_PROTOCOL = ReprProtocol

See mrjob.protocol for the full list of protocols.

MRJob.input_protocol()

Instance of the protocol to use to convert input lines to Python objects. Default behavior is to return an instance of INPUT_PROTOCOL.

MRJob.internal_protocol()

Instance of the protocol to use to communicate between steps. Default behavior is to return an instance of INTERNAL_PROTOCOL.

MRJob.output_protocol()

Instance of the protocol to use to convert Python objects to output lines. Default behavior is to return an instance of OUTPUT_PROTOCOL.

MRJob.pick_protocols(step_num, step_type)

Pick the protocol classes to use for reading and writing for the given step.

Parameters:
  • step_num (int) – which step to run (e.g. 0 for the first step)
  • step_type (str) – one of ‘mapper’, ‘combiner’, or ‘reducer’
Returns:

(read_function, write_function)

By default, we use one protocol for reading input, one internal protocol for communication between steps, and one protocol for final output (which is usually the same as the internal protocol). Protocols can be controlled by setting INPUT_PROTOCOL, INTERNAL_PROTOCOL, and OUTPUT_PROTOCOL.

Re-define this if you need fine control over which protocols are used by which steps.

Secondary sort

MRJob.SORT_VALUES = None

Set this to True if you would like reducers to receive the values associated with any key in sorted order (sorted by their encoded value). Also known as secondary sort.

This can be useful if you expect more values than you can fit in memory to be associated with one key, but you want to apply information in a small subset of these values to information in the other values. For example, you may want to convert counts to percentages, and to do this you first need to know the total count.

Even though values are sorted by their encoded value, most encodings will sort strings in order. For example, you could have values like: ['A', <total>], ['B', <count_name>, <count>], and the value containing the total should come first regardless of what protocol you’re using.

See jobconf() and partitioner() for more about

Command-line options

See Defining command line options for information on adding command line options to your job. See Configuration quick reference for a complete list of all configuration options.

MRJob.configure_args()

Define arguments for this script. Called from __init__().

Re-define to define custom command-line arguments or pass through existing ones:

def configure_args(self):
    super(MRYourJob, self).configure_args()

    self.add_passthru_arg(...)
    self.add_file_arg(...)
    self.pass_arg_through(...)
    ...
MRJob.add_passthru_arg(*args, **kwargs)

Function to create options which both the job runner and the job itself respect (we use this for protocols, for example).

Use it like you would use argparse.ArgumentParser.add_argument():

def configure_args(self):
    super(MRYourJob, self).configure_args()
    self.add_passthru_arg(
        '--max-ngram-size', type=int, default=4, help='...')

If you want to pass files through to the mapper/reducer, use add_file_arg() instead.

If you want to pass through a built-in option (e.g. --runner, use pass_arg_through() instead.

MRJob.add_file_arg(*args, **kwargs)

Add a command-line option that sends an external file (e.g. a SQLite DB) to Hadoop:

def configure_args(self):
   super(MRYourJob, self).configure_args()
   self.add_file_arg('--scoring-db', help=...)

This does the right thing: the file will be uploaded to the working dir of the script on Hadoop, and the script will be passed the same option, but with the local name of the file in the script’s working directory.

Note

If you pass a file to a job, best practice is to lazy-load its contents (e.g. make a method that opens the file the first time you call it) rather than loading it in your job’s constructor or load_args(). Not only is this more efficient, it’s necessary if you want to run your job in a Spark executor (because the file may not be in the same place in a Spark driver).

Note

We suggest against sending Berkeley DBs to your job, as Berkeley DB is not forwards-compatible (so a Berkeley DB that you construct on your computer may not be readable from within Hadoop). Use SQLite databases instead. If all you need is an on-disk hash table, try out the sqlite3dbm module.

Changed in version 0.6.6: now accepts explicit type=str

Changed in version 0.6.8: fully supported on Spark, including local[*] master

MRJob.pass_arg_through(opt_str)

Pass the given argument through to the job.

MRJob.load_args(args)

Load command-line options into self.options.

Called from __init__() after configure_args().

Parameters:args (list of str) – a list of command line arguments. None will be treated the same as [].

Re-define if you want to post-process command-line arguments:

def load_args(self, args):
    super(MRYourJob, self).load_args(args)

    self.stop_words = self.options.stop_words.split(',')
    ...
MRJob.is_task()

True if this is a mapper, combiner, reducer, or Spark script.

This is mostly useful inside load_args(), to disable loading args when we aren’t running inside Hadoop.

Uploading support files

MRJob.FILES = []

Optional list of files to upload to the job’s working directory. These can be URIs or paths on the local filesystem.

Relative paths will be interpreted as relative to the directory containing the script (not the current working directory). Environment variables and ~ in paths will be expanded.

If you want a file to be uploaded to a filename other than it’s own, append #<name> (e.g. data/foo.json#bar.json).

If you need to dynamically generate a list of files, override files() instead.

New in version 0.6.4.

MRJob.DIRS = []

Optional list of directories to upload to the job’s working directory. These can be URIs or paths on the local filesystem.

Relative paths will be interpreted as relative to the directory containing the script (not the current working directory). Environment variables and ~ in paths will be expanded.

If you want a directory to be copied with a name other than it’s own, append #<name> (e.g. data/foo#bar).

If you need to dynamically generate a list of files, override dirs() instead.

New in version 0.6.4.

MRJob.ARCHIVES = []

Optional list of archives to upload and unpack in the job’s working directory. These can be URIs or paths on the local filesystem.

Relative paths will be interpreted as relative to the directory containing the script (not the current working directory). Environment variables and ~ in paths will be expanded.

By default, the directory will have the same name as the archive (e.g. foo.tar.gz/). To change the directory’s name, append #<name>:

ARCHIVES = ['data/foo.tar.gz#foo']

If you need to dynamically generate a list of files, override archives() instead.

New in version 0.6.4.

MRJob.files()

Like FILES, except that it can return a dynamically generated list of files to upload. Overriding this method disables FILES.

Paths returned by this method are relative to the working directory (not the script). Note that the job runner will always expand environment variables and ~ in paths returned by this method.

You do not have to worry about inadvertently disabling --files; this switch is handled separately.

New in version 0.6.4.

MRJob.dirs()

Like DIRS, except that it can return a dynamically generated list of directories to upload. Overriding this method disables DIRS.

Paths returned by this method are relative to the working directory (not the script). Note that the job runner will always expand environment variables and ~ in paths returned by this method.

You do not have to worry about inadvertently disabling --dirs; this switch is handled separately.

New in version 0.6.4.

MRJob.archives()

Like ARCHIVES, except that it can return a dynamically generated list of archives to upload and unpack. Overriding this method disables ARCHIVES.

Paths returned by this method are relative to the working directory (not the script). Note that the job runner will always expand environment variables and ~ in paths returned by this method.

You do not have to worry about inadvertently disabling --archives; this switch is handled separately.

New in version 0.6.4.

Job runner configuration

classmethod MRJob.mr_job_script()

Path of this script. This returns the file containing this class, or None if there isn’t any (e.g. it was defined from the command line interface.)

Running specific parts of jobs

MRJob.run_job()

Run the all steps of the job, logging errors (and debugging output if --verbose is specified) to STDERR and streaming the output to STDOUT.

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.run_mapper(step_num=0)

Run the mapper and final mapper action for the given step.

Parameters:step_num (int) – which step to run (0-indexed)

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.map_pairs(pairs, step_num=0)

Runs mapper_init(), mapper()/mapper_raw(), and mapper_final() for one map task in one step.

Takes in a sequence of (key, value) pairs as input, and yields (key, value) pairs as output.

run_mapper() essentially wraps this method with code to handle reading/decoding input and writing/encoding output.

New in version 0.6.7.

MRJob.run_reducer(step_num=0)

Run the reducer for the given step.

Parameters:step_num (int) – which step to run (0-indexed)

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.reduce_pairs(pairs, step_num=0)

Runs reducer_init(), reducer(), and reducer_final() for one reduce task in one step.

Takes in a sequence of (key, value) pairs as input, and yields (key, value) pairs as output.

run_reducer() essentially wraps this method with code to handle reading/decoding input and writing/encoding output.

New in version 0.6.7.

MRJob.run_combiner(step_num=0)

Run the combiner for the given step.

Parameters:step_num (int) – which step to run (0-indexed)

If we encounter a line that can’t be decoded by our input protocol, or a tuple that can’t be encoded by our output protocol, we’ll increment a counter rather than raising an exception. If –strict-protocols is set, then an exception is raised

Called from run(). You’d probably only want to call this directly from automated tests.

MRJob.combine_pairs(pairs, step_num=0)

Runs combiner_init(), combiner(), and combiner_final() for one reduce task in one step.

Takes in a sequence of (key, value) pairs as input, and yields (key, value) pairs as output.

run_combiner() essentially wraps this method with code to handle reading/decoding input and writing/encoding output.

New in version 0.6.7.

Hadoop configuration

MRJob.HADOOP_INPUT_FORMAT = None

Optional name of an optional Hadoop InputFormat class, e.g. 'org.apache.hadoop.mapred.lib.NLineInputFormat'.

Passed to Hadoop with the first step of this job with the -inputformat option.

If you require more sophisticated behavior, try hadoop_input_format() or the hadoop_input_format argument to mrjob.runner.MRJobRunner.__init__().

MRJob.hadoop_input_format()

Optional Hadoop InputFormat class to parse input for the first step of the job.

Normally, setting HADOOP_INPUT_FORMAT is sufficient; redefining this method is only for when you want to get fancy.

MRJob.HADOOP_OUTPUT_FORMAT = None

Optional name of an optional Hadoop OutputFormat class, e.g. 'org.apache.hadoop.mapred.FileOutputFormat'.

Passed to Hadoop with the last step of this job with the -outputformat option.

If you require more sophisticated behavior, try hadoop_output_format() or the hadoop_output_format argument to mrjob.runner.MRJobRunner.__init__().

MRJob.hadoop_output_format()

Optional Hadoop OutputFormat class to write output for the last step of the job.

Normally, setting HADOOP_OUTPUT_FORMAT is sufficient; redefining this method is only for when you want to get fancy.

MRJob.JOBCONF = {}

Optional jobconf arguments we should always pass to Hadoop. This is a map from property name to value. e.g.:

{'stream.num.map.output.key.fields': '4'}

It’s recommended that you only use this to hard-code things that affect the semantics of your job, and leave performance tweaks to the command line or whatever you use to launch your job.

MRJob.jobconf()

-D args to pass to hadoop streaming. This should be a map from property name to value. By default, returns JOBCONF.

Changed in version 0.6.6: re-defining longer clobbers command-line --jobconf options.

MRJob.LIBJARS = []

Optional list of paths of jar files to run our job with using Hadoop’s -libjars option.

~ and environment variables in paths be expanded, and relative paths will be interpreted as relative to the directory containing the script (not the current working directory).

If you require more sophisticated behavior, try overriding libjars().

MRJob.libjars()

Optional list of paths of jar files to run our job with using Hadoop’s -libjars option. Normally setting LIBJARS is sufficient. Paths from LIBJARS are interpreted as relative to the the directory containing the script (paths from the command-line are relative to the current working directory).

Note that ~ and environment variables in paths will always be expanded by the job runner (see libjars).

Changed in version 0.6.6: re-defining this no longer clobbers the command-line --libjars option

MRJob.PARTITIONER = None

Optional Hadoop partitioner class to use to determine how mapper output should be sorted and distributed to reducers. For example: 'org.apache.hadoop.mapred.lib.HashPartitioner'.

If you require more sophisticated behavior, try partitioner().

MRJob.partitioner()

Optional Hadoop partitioner class to use to determine how mapper output should be sorted and distributed to reducers.

By default, returns PARTITIONER.

You probably don’t need to re-define this; it’s just here for completeness.

Hooks for testing

MRJob.sandbox(stdin=None, stdout=None, stderr=None)

Redirect stdin, stdout, and stderr for automated testing.

You can set stdin, stdout, and stderr to file objects. By default, they’ll be set to empty BytesIO objects. You can then access the job’s file handles through self.stdin, self.stdout, and self.stderr. See Testing jobs for more information about testing.

You may call sandbox multiple times (this will essentially clear the file handles).

stdin is empty by default. You can set it to anything that yields lines:

mr_job.sandbox(stdin=BytesIO(b'some_data\n'))

or, equivalently:

mr_job.sandbox(stdin=[b'some_data\n'])

For convenience, this sandbox() returns self, so you can do:

mr_job = MRJobClassToTest().sandbox()

Simple testing example:

mr_job = MRYourJob.sandbox()
self.assertEqual(list(mr_job.reducer('foo', ['a', 'b'])), [...])

More complex testing example:

from BytesIO import BytesIO

from mrjob.parse import parse_mr_job_stderr
from mrjob.protocol import JSONProtocol

mr_job = MRYourJob(args=[...])

fake_input = '"foo"\t"bar"\n"foo"\t"baz"\n'
mr_job.sandbox(stdin=BytesIO(fake_input))

mr_job.run_reducer(link_num=0)

self.assertEqual(mrjob.stdout.getvalue(), ...)
self.assertEqual(parse_mr_job_stderr(mr_job.stderr), ...)

Note

If you are using Spark, it’s recommended you only pass in io.BytesIO or other serializable alternatives to file objects. stdin, stdout, and stderr get stored as job attributes, which means if they aren’t serializable, neither is the job instance or its methods.