mrjob.job - defining your job¶
-
class
mrjob.job.
MRJob
(args=None)¶ The base class for all MapReduce jobs. See
__init__()
for details.
One-step jobs¶
-
MRJob.
mapper
(key, value)¶ Re-define this to define the mapper for a one-step job.
Yields zero or more tuples of
(out_key, out_value)
.Parameters: - key – A value parsed from input.
- value – A value parsed from input.
If you don’t re-define this, your job will have a mapper that simply yields
(key, value)
as-is.- By default (if you don’t mess with Protocols):
key
will beNone
value
will be the raw input line, with newline stripped.out_key
andout_value
must be JSON-encodable: numeric, unicode, boolean,None
, list, or dict whose keys are unicodes.
-
MRJob.
reducer
(key, values)¶ Re-define this to define the reducer for a one-step job.
Yields one or more tuples of
(out_key, out_value)
Parameters: - key – A key which was yielded by the mapper
- value – A generator which yields all values yielded by the
mapper which correspond to
key
.
- By default (if you don’t mess with Protocols):
out_key
andout_value
must be JSON-encodable.key
andvalue
will have been decoded from JSON (so tuples will become lists).
-
MRJob.
combiner
(key, values)¶ Re-define this to define the combiner for a one-step job.
Yields one or more tuples of
(out_key, out_value)
Parameters: - key – A key which was yielded by the mapper
- value – A generator which yields all values yielded by one mapper
task/node which correspond to
key
.
- By default (if you don’t mess with Protocols):
out_key
andout_value
must be JSON-encodable.key
andvalue
will have been decoded from JSON (so tuples will become lists).
-
MRJob.
mapper_init
()¶ Re-define this to define an action to run before the mapper processes any input.
One use for this function is to initialize mapper-specific helper structures.
Yields one or more tuples of
(out_key, out_value)
.By default,
out_key
andout_value
must be JSON-encodable; re-defineINTERNAL_PROTOCOL
to change this.
-
MRJob.
mapper_final
()¶ Re-define this to define an action to run after the mapper reaches the end of input.
One way to use this is to store a total in an instance variable, and output it after reading all input data. See
mrjob.examples
for an example.Yields one or more tuples of
(out_key, out_value)
.By default,
out_key
andout_value
must be JSON-encodable; re-defineINTERNAL_PROTOCOL
to change this.
-
MRJob.
reducer_init
()¶ Re-define this to define an action to run before the reducer processes any input.
One use for this function is to initialize reducer-specific helper structures.
Yields one or more tuples of
(out_key, out_value)
.By default,
out_key
andout_value
must be JSON-encodable; re-defineINTERNAL_PROTOCOL
to change this.
-
MRJob.
reducer_final
()¶ Re-define this to define an action to run after the reducer reaches the end of input.
Yields one or more tuples of
(out_key, out_value)
.By default,
out_key
andout_value
must be JSON-encodable; re-defineINTERNAL_PROTOCOL
to change this.
-
MRJob.
combiner_init
()¶ Re-define this to define an action to run before the combiner processes any input.
One use for this function is to initialize combiner-specific helper structures.
Yields one or more tuples of
(out_key, out_value)
.By default,
out_key
andout_value
must be JSON-encodable; re-defineINTERNAL_PROTOCOL
to change this.
-
MRJob.
combiner_final
()¶ Re-define this to define an action to run after the combiner reaches the end of input.
Yields one or more tuples of
(out_key, out_value)
.By default,
out_key
andout_value
must be JSON-encodable; re-defineINTERNAL_PROTOCOL
to change this.
-
MRJob.
mapper_cmd
()¶ Re-define this to define the mapper for a one-step job as a shell command. If you define your mapper this way, the command will be passed unchanged to Hadoop Streaming, with some minor exceptions. For important specifics, see Shell commands as steps.
Basic example:
def mapper_cmd(self): return 'cat'
-
MRJob.
reducer_cmd
()¶ Re-define this to define the reducer for a one-step job as a shell command. If you define your mapper this way, the command will be passed unchanged to Hadoop Streaming, with some minor exceptions. For specifics, see Shell commands as steps.
Basic example:
def reducer_cmd(self): return 'cat'
-
MRJob.
combiner_cmd
()¶ Re-define this to define the combiner for a one-step job as a shell command. If you define your mapper this way, the command will be passed unchanged to Hadoop Streaming, with some minor exceptions. For specifics, see Shell commands as steps.
Basic example:
def combiner_cmd(self): return 'cat'
-
MRJob.
mapper_pre_filter
()¶ Re-define this to specify a shell command to filter the mapper’s input before it gets to your job’s mapper in a one-step job. For important specifics, see Filtering task input with shell commands.
Basic example:
def mapper_pre_filter(self): return 'grep "ponies"'
-
MRJob.
reducer_pre_filter
()¶ Re-define this to specify a shell command to filter the reducer’s input before it gets to your job’s reducer in a one-step job. For important specifics, see Filtering task input with shell commands.
Basic example:
def reducer_pre_filter(self): return 'grep "ponies"'
-
MRJob.
combiner_pre_filter
()¶ Re-define this to specify a shell command to filter the combiner’s input before it gets to your job’s combiner in a one-step job. For important specifics, see Filtering task input with shell commands.
Basic example:
def combiner_pre_filter(self): return 'grep "ponies"'
-
MRJob.
mapper_raw
(input_path, input_uri)¶ Re-define this to make Hadoop pass one input file to each mapper.
Parameters: - input_path – a local path that the input file has been copied to
- input_uri – the URI of the input file on HDFS, S3, etc
New in version 0.6.3.
-
MRJob.
spark
(input_path, output_path)¶ Re-define this with Spark code to run. You can read input with input_path and output with output_path.
Warning
Prior to v0.6.8, to pass job methods into Spark (
rdd.flatMap(self.some_method)
), you first had to callself.sandbox()
; otherwise Spark would error because self was not serializable.
Multi-step jobs¶
-
MRJob.
steps
()¶ Re-define this to make a multi-step job.
If you don’t re-define this, we’ll automatically create a one-step job using any of
mapper()
,mapper_init()
,mapper_final()
,reducer_init()
,reducer_final()
, andreducer()
that you’ve re-defined. For example:def steps(self): return [MRStep(mapper=self.transform_input, reducer=self.consolidate_1), MRStep(reducer_init=self.log_mapper_init, reducer=self.consolidate_2)]
Returns: a list of steps constructed with MRStep
or other classes inmrjob.step
.
Running the job¶
-
classmethod
MRJob.
run
()¶ Entry point for running job from the command-line.
This is also the entry point when a mapper or reducer is run by Hadoop Streaming.
Does one of:
- Run a mapper (
--mapper
). Seerun_mapper()
- Run a combiner (
--combiner
). Seerun_combiner()
- Run a reducer (
--reducer
). Seerun_reducer()
- Run the entire job. See
run_job()
- Run a mapper (
-
MRJob.
__init__
(args=None)¶ Entry point for running your job from other Python code.
You can pass in command-line arguments, and the job will act the same way it would if it were run from the command line. For example, to run your job on EMR:
mr_job = MRYourJob(args=['-r', 'emr']) with mr_job.make_runner() as runner: ...
Passing in
None
is the same as passing insys.argv[1:]
For a full list of command-line arguments, run:
python -m mrjob.job --help
Parameters: args – Arguments to your script (switches and input files) Changed in version 0.7.0: Previously, args set to
None
was equivalent to[]
.
-
MRJob.
make_runner
()¶ Make a runner based on command-line arguments, so we can launch this job on EMR, on Hadoop, or locally.
Return type: mrjob.runner.MRJobRunner
Parsing output¶
-
MRJob.
parse_output
(chunks)¶ Parse the final output of this MRJob (as a stream of byte chunks) into a stream of
(key, value)
.
Counters and status messages¶
-
MRJob.
increment_counter
(group, counter, amount=1)¶ Increment a counter in Hadoop streaming by printing to stderr.
Parameters: Commas in
counter
orgroup
will be automatically replaced with semicolons (commas confuse Hadoop streaming).
-
MRJob.
set_status
(msg)¶ Set the job status in hadoop streaming by printing to stderr.
This is also a good way of doing a keepalive for a job that goes a long time between outputs; Hadoop streaming usually times out jobs that give no output for longer than 10 minutes.
Setting protocols¶
-
MRJob.
INPUT_PROTOCOL
= <class 'mrjob.protocol.BytesValueProtocol'>¶ Protocol for reading input to the first mapper in your job. Default:
RawValueProtocol
.For example you know your input data were in JSON format, you could set:
INPUT_PROTOCOL = JSONValueProtocol
in your class, and your initial mapper would receive decoded JSONs rather than strings.
See
mrjob.protocol
for the full list of protocols.
-
MRJob.
INTERNAL_PROTOCOL
= <class 'mrjob.protocol.StandardJSONProtocol'>¶ Protocol for communication between steps and final output. Default:
JSONProtocol
.For example if your step output weren’t JSON-encodable, you could set:
INTERNAL_PROTOCOL = PickleProtocol
and step output would be encoded as string-escaped pickles.
See
mrjob.protocol
for the full list of protocols.
-
MRJob.
OUTPUT_PROTOCOL
= <class 'mrjob.protocol.StandardJSONProtocol'>¶ Protocol to use for writing output. Default:
JSONProtocol
.For example, if you wanted the final output in repr, you could set:
OUTPUT_PROTOCOL = ReprProtocol
See
mrjob.protocol
for the full list of protocols.
-
MRJob.
input_protocol
()¶ Instance of the protocol to use to convert input lines to Python objects. Default behavior is to return an instance of
INPUT_PROTOCOL
.
-
MRJob.
internal_protocol
()¶ Instance of the protocol to use to communicate between steps. Default behavior is to return an instance of
INTERNAL_PROTOCOL
.
-
MRJob.
output_protocol
()¶ Instance of the protocol to use to convert Python objects to output lines. Default behavior is to return an instance of
OUTPUT_PROTOCOL
.
-
MRJob.
pick_protocols
(step_num, step_type)¶ Pick the protocol classes to use for reading and writing for the given step.
Parameters: Returns: (read_function, write_function)
By default, we use one protocol for reading input, one internal protocol for communication between steps, and one protocol for final output (which is usually the same as the internal protocol). Protocols can be controlled by setting
INPUT_PROTOCOL
,INTERNAL_PROTOCOL
, andOUTPUT_PROTOCOL
.Re-define this if you need fine control over which protocols are used by which steps.
Secondary sort¶
-
MRJob.
SORT_VALUES
= None¶ Set this to
True
if you would like reducers to receive the values associated with any key in sorted order (sorted by their encoded value). Also known as secondary sort.This can be useful if you expect more values than you can fit in memory to be associated with one key, but you want to apply information in a small subset of these values to information in the other values. For example, you may want to convert counts to percentages, and to do this you first need to know the total count.
Even though values are sorted by their encoded value, most encodings will sort strings in order. For example, you could have values like:
['A', <total>]
,['B', <count_name>, <count>]
, and the value containing the total should come first regardless of what protocol you’re using.See
jobconf()
andpartitioner()
for more about
Command-line options¶
See Defining command line options for information on adding command line options to your job. See Configuration quick reference for a complete list of all configuration options.
-
MRJob.
configure_args
()¶ Define arguments for this script. Called from
__init__()
.Re-define to define custom command-line arguments or pass through existing ones:
def configure_args(self): super(MRYourJob, self).configure_args() self.add_passthru_arg(...) self.add_file_arg(...) self.pass_arg_through(...) ...
-
MRJob.
add_passthru_arg
(*args, **kwargs)¶ Function to create options which both the job runner and the job itself respect (we use this for protocols, for example).
Use it like you would use
argparse.ArgumentParser.add_argument()
:def configure_args(self): super(MRYourJob, self).configure_args() self.add_passthru_arg( '--max-ngram-size', type=int, default=4, help='...')
If you want to pass files through to the mapper/reducer, use
add_file_arg()
instead.If you want to pass through a built-in option (e.g.
--runner
, usepass_arg_through()
instead.
-
MRJob.
add_file_arg
(*args, **kwargs)¶ Add a command-line option that sends an external file (e.g. a SQLite DB) to Hadoop:
def configure_args(self): super(MRYourJob, self).configure_args() self.add_file_arg('--scoring-db', help=...)
This does the right thing: the file will be uploaded to the working dir of the script on Hadoop, and the script will be passed the same option, but with the local name of the file in the script’s working directory.
Note
If you pass a file to a job, best practice is to lazy-load its contents (e.g. make a method that opens the file the first time you call it) rather than loading it in your job’s constructor or
load_args()
. Not only is this more efficient, it’s necessary if you want to run your job in a Spark executor (because the file may not be in the same place in a Spark driver).Note
We suggest against sending Berkeley DBs to your job, as Berkeley DB is not forwards-compatible (so a Berkeley DB that you construct on your computer may not be readable from within Hadoop). Use SQLite databases instead. If all you need is an on-disk hash table, try out the
sqlite3dbm
module.Changed in version 0.6.6: now accepts explicit
type=str
Changed in version 0.6.8: fully supported on Spark, including
local[*]
master
-
MRJob.
pass_arg_through
(opt_str)¶ Pass the given argument through to the job.
-
MRJob.
load_args
(args)¶ Load command-line options into
self.options
.Called from
__init__()
afterconfigure_args()
.Parameters: args (list of str) – a list of command line arguments. None
will be treated the same as[]
.Re-define if you want to post-process command-line arguments:
def load_args(self, args): super(MRYourJob, self).load_args(args) self.stop_words = self.options.stop_words.split(',') ...
-
MRJob.
is_task
()¶ True if this is a mapper, combiner, reducer, or Spark script.
This is mostly useful inside
load_args()
, to disable loading args when we aren’t running inside Hadoop.
Uploading support files¶
-
MRJob.
FILES
= []¶ Optional list of files to upload to the job’s working directory. These can be URIs or paths on the local filesystem.
Relative paths will be interpreted as relative to the directory containing the script (not the current working directory). Environment variables and
~
in paths will be expanded.If you want a file to be uploaded to a filename other than it’s own, append
#<name>
(e.g.data/foo.json#bar.json
).If you need to dynamically generate a list of files, override
files()
instead.New in version 0.6.4.
-
MRJob.
DIRS
= []¶ Optional list of directories to upload to the job’s working directory. These can be URIs or paths on the local filesystem.
Relative paths will be interpreted as relative to the directory containing the script (not the current working directory). Environment variables and
~
in paths will be expanded.If you want a directory to be copied with a name other than it’s own, append
#<name>
(e.g.data/foo#bar
).If you need to dynamically generate a list of files, override
dirs()
instead.New in version 0.6.4.
-
MRJob.
ARCHIVES
= []¶ Optional list of archives to upload and unpack in the job’s working directory. These can be URIs or paths on the local filesystem.
Relative paths will be interpreted as relative to the directory containing the script (not the current working directory). Environment variables and
~
in paths will be expanded.By default, the directory will have the same name as the archive (e.g.
foo.tar.gz/
). To change the directory’s name, append#<name>
:ARCHIVES = ['data/foo.tar.gz#foo']
If you need to dynamically generate a list of files, override
archives()
instead.New in version 0.6.4.
-
MRJob.
files
()¶ Like
FILES
, except that it can return a dynamically generated list of files to upload. Overriding this method disablesFILES
.Paths returned by this method are relative to the working directory (not the script). Note that the job runner will always expand environment variables and
~
in paths returned by this method.You do not have to worry about inadvertently disabling
--files
; this switch is handled separately.New in version 0.6.4.
-
MRJob.
dirs
()¶ Like
DIRS
, except that it can return a dynamically generated list of directories to upload. Overriding this method disablesDIRS
.Paths returned by this method are relative to the working directory (not the script). Note that the job runner will always expand environment variables and
~
in paths returned by this method.You do not have to worry about inadvertently disabling
--dirs
; this switch is handled separately.New in version 0.6.4.
-
MRJob.
archives
()¶ Like
ARCHIVES
, except that it can return a dynamically generated list of archives to upload and unpack. Overriding this method disablesARCHIVES
.Paths returned by this method are relative to the working directory (not the script). Note that the job runner will always expand environment variables and
~
in paths returned by this method.You do not have to worry about inadvertently disabling
--archives
; this switch is handled separately.New in version 0.6.4.
Job runner configuration¶
-
classmethod
MRJob.
mr_job_script
()¶ Path of this script. This returns the file containing this class, or
None
if there isn’t any (e.g. it was defined from the command line interface.)
Running specific parts of jobs¶
-
MRJob.
run_job
()¶ Run the all steps of the job, logging errors (and debugging output if
--verbose
is specified) to STDERR and streaming the output to STDOUT.Called from
run()
. You’d probably only want to call this directly from automated tests.
-
MRJob.
run_mapper
(step_num=0)¶ Run the mapper and final mapper action for the given step.
Parameters: step_num (int) – which step to run (0-indexed) Called from
run()
. You’d probably only want to call this directly from automated tests.
-
MRJob.
map_pairs
(pairs, step_num=0)¶ Runs
mapper_init()
,mapper()
/mapper_raw()
, andmapper_final()
for one map task in one step.Takes in a sequence of (key, value) pairs as input, and yields (key, value) pairs as output.
run_mapper()
essentially wraps this method with code to handle reading/decoding input and writing/encoding output.New in version 0.6.7.
-
MRJob.
run_reducer
(step_num=0)¶ Run the reducer for the given step.
Parameters: step_num (int) – which step to run (0-indexed) Called from
run()
. You’d probably only want to call this directly from automated tests.
-
MRJob.
reduce_pairs
(pairs, step_num=0)¶ Runs
reducer_init()
,reducer()
, andreducer_final()
for one reduce task in one step.Takes in a sequence of (key, value) pairs as input, and yields (key, value) pairs as output.
run_reducer()
essentially wraps this method with code to handle reading/decoding input and writing/encoding output.New in version 0.6.7.
-
MRJob.
run_combiner
(step_num=0)¶ Run the combiner for the given step.
Parameters: step_num (int) – which step to run (0-indexed) If we encounter a line that can’t be decoded by our input protocol, or a tuple that can’t be encoded by our output protocol, we’ll increment a counter rather than raising an exception. If –strict-protocols is set, then an exception is raised
Called from
run()
. You’d probably only want to call this directly from automated tests.
-
MRJob.
combine_pairs
(pairs, step_num=0)¶ Runs
combiner_init()
,combiner()
, andcombiner_final()
for one reduce task in one step.Takes in a sequence of (key, value) pairs as input, and yields (key, value) pairs as output.
run_combiner()
essentially wraps this method with code to handle reading/decoding input and writing/encoding output.New in version 0.6.7.
Hadoop configuration¶
-
MRJob.
HADOOP_INPUT_FORMAT
= None¶ Optional name of an optional Hadoop
InputFormat
class, e.g.'org.apache.hadoop.mapred.lib.NLineInputFormat'
.Passed to Hadoop with the first step of this job with the
-inputformat
option.If you require more sophisticated behavior, try
hadoop_input_format()
or the hadoop_input_format argument tomrjob.runner.MRJobRunner.__init__()
.
-
MRJob.
hadoop_input_format
()¶ Optional Hadoop
InputFormat
class to parse input for the first step of the job.Normally, setting
HADOOP_INPUT_FORMAT
is sufficient; redefining this method is only for when you want to get fancy.
-
MRJob.
HADOOP_OUTPUT_FORMAT
= None¶ Optional name of an optional Hadoop
OutputFormat
class, e.g.'org.apache.hadoop.mapred.FileOutputFormat'
.Passed to Hadoop with the last step of this job with the
-outputformat
option.If you require more sophisticated behavior, try
hadoop_output_format()
or the hadoop_output_format argument tomrjob.runner.MRJobRunner.__init__()
.
-
MRJob.
hadoop_output_format
()¶ Optional Hadoop
OutputFormat
class to write output for the last step of the job.Normally, setting
HADOOP_OUTPUT_FORMAT
is sufficient; redefining this method is only for when you want to get fancy.
-
MRJob.
JOBCONF
= {}¶ Optional jobconf arguments we should always pass to Hadoop. This is a map from property name to value. e.g.:
{'stream.num.map.output.key.fields': '4'}
It’s recommended that you only use this to hard-code things that affect the semantics of your job, and leave performance tweaks to the command line or whatever you use to launch your job.
-
MRJob.
jobconf
()¶ -D
args to pass to hadoop streaming. This should be a map from property name to value. By default, returnsJOBCONF
.Changed in version 0.6.6: re-defining longer clobbers command-line
--jobconf
options.
-
MRJob.
LIBJARS
= []¶ Optional list of paths of jar files to run our job with using Hadoop’s
-libjars
option.~
and environment variables in paths be expanded, and relative paths will be interpreted as relative to the directory containing the script (not the current working directory).If you require more sophisticated behavior, try overriding
libjars()
.
-
MRJob.
libjars
()¶ Optional list of paths of jar files to run our job with using Hadoop’s
-libjars
option. Normally settingLIBJARS
is sufficient. Paths fromLIBJARS
are interpreted as relative to the the directory containing the script (paths from the command-line are relative to the current working directory).Note that
~
and environment variables in paths will always be expanded by the job runner (see libjars).Changed in version 0.6.6: re-defining this no longer clobbers the command-line
--libjars
option
-
MRJob.
PARTITIONER
= None¶ Optional Hadoop partitioner class to use to determine how mapper output should be sorted and distributed to reducers. For example:
'org.apache.hadoop.mapred.lib.HashPartitioner'
.If you require more sophisticated behavior, try
partitioner()
.
-
MRJob.
partitioner
()¶ Optional Hadoop partitioner class to use to determine how mapper output should be sorted and distributed to reducers.
By default, returns
PARTITIONER
.You probably don’t need to re-define this; it’s just here for completeness.
Hooks for testing¶
-
MRJob.
sandbox
(stdin=None, stdout=None, stderr=None)¶ Redirect stdin, stdout, and stderr for automated testing.
You can set stdin, stdout, and stderr to file objects. By default, they’ll be set to empty
BytesIO
objects. You can then access the job’s file handles throughself.stdin
,self.stdout
, andself.stderr
. See Testing jobs for more information about testing.You may call sandbox multiple times (this will essentially clear the file handles).
stdin
is empty by default. You can set it to anything that yields lines:mr_job.sandbox(stdin=BytesIO(b'some_data\n'))
or, equivalently:
mr_job.sandbox(stdin=[b'some_data\n'])
For convenience, this sandbox() returns self, so you can do:
mr_job = MRJobClassToTest().sandbox()
Simple testing example:
mr_job = MRYourJob.sandbox() self.assertEqual(list(mr_job.reducer('foo', ['a', 'b'])), [...])
More complex testing example:
from BytesIO import BytesIO from mrjob.parse import parse_mr_job_stderr from mrjob.protocol import JSONProtocol mr_job = MRYourJob(args=[...]) fake_input = '"foo"\t"bar"\n"foo"\t"baz"\n' mr_job.sandbox(stdin=BytesIO(fake_input)) mr_job.run_reducer(link_num=0) self.assertEqual(mrjob.stdout.getvalue(), ...) self.assertEqual(parse_mr_job_stderr(mr_job.stderr), ...)
Note
If you are using Spark, it’s recommended you only pass in
io.BytesIO
or other serializable alternatives to file objects. stdin, stdout, and stderr get stored as job attributes, which means if they aren’t serializable, neither is the job instance or its methods.