mrjob.step - represent Job Steps

Representations of job steps, to use in your MRJob‘s steps() method.

Because the runner just needs to know how to invoke your MRJob script, not how it works insternally, each step instance’s description() method produces a simplified, JSON-able description of the step, to pass to the runner.

Steps

class mrjob.step.MRStep(**kwargs)

Represents steps handled by the script containing your job.

Used by MRJob.steps. See Multi-step jobs for sample usage.

Takes the following keyword arguments: combiner, combiner_cmd, combiner_final, combiner_init, combiner_pre_filter, mapper, mapper_cmd, mapper_final, mapper_init, mapper_pre_filter, mapper_raw, reducer, reducer_cmd, reducer_final, reducer_init, reducer_pre_filter. These should be set to None or a function with the same signature as the corresponding method in MRJob.

Also accepts jobconf, a dictionary with custom jobconf arguments to pass to hadoop.

A MRStep’s description looks like:

{
    'type': 'streaming',
    'mapper': { ... },
    'combiner': { ... },
    'reducer': { ... },
    'jobconf': { ... },  # dict of Hadoop configuration properties
}

At least one of mapper, combiner and reducer need be included. jobconf is completely optional.

mapper, combiner, and reducer are either handled by the script containing your job definition, in which case they look like:

{
    'type': 'script',
    'pre_filter': 'grep -v bad', # optional cmd to filter input
}

or they simply run a command, which looks like:

{
    'type': 'command',
    'command': 'cut -f 1-2', # command to run, as a string
}
class mrjob.step.JarStep(jar, **kwargs)

Represents a running a custom Jar as a step.

Accepts the following keyword arguments:

Parameters:
  • jar – The local path to the Jar. On EMR, this can also be an s3:// URI, or file:// to reference a jar on the local filesystem of your EMR instance(s).
  • args – (optional) A list of arguments to the jar. Use mrjob.step.INPUT and OUTPUT to interpolate input and output paths.
  • jobconf – (optional) A dictionary of Hadoop properties
  • main_class – (optional) The main class to run from the jar. If not specified, Hadoop will use the main class in the jar’s manifest file.

jar can also be passed as a positional argument

See Jar steps for sample usage.

Sample description of a JarStep:

{
    'type': 'jar',
    'jar': 'binks.jar.jar',
    'main_class': 'MyMainMan',  # optional
    'args': ['argh', 'argh']  # optional
    'jobconf': { ... }  # optional
}

To give your jar access to input files, an empty output directory, configuration properties, and libjars managed by mrjob, you may include INPUT, OUTPUT, and GENERIC_ARGS in args.

class mrjob.step.SparkStep(spark, **kwargs)

Represents running a Spark step defined in your job.

Accepts the following keyword arguments:

Parameters:
  • spark – function containing your Spark code with same function signature as spark()
  • jobconf – (optional) A dictionary of Hadoop properties
  • spark_args – (optional) an array of arguments to pass to spark-submit (e.g. ['--executor-memory', '2G']).

Sample description of a SparkStep:

{
    'type': 'spark',
    'jobconf': { ... },  # optional
    'spark_args': ['--executor-memory', '2G'],  # optional
}
class mrjob.step.SparkJarStep(jar, main_class, **kwargs)

Represents a running a separate Jar through Spark

Accepts the following keyword arguments:

Parameters:
  • jar – The local path to the Python script to run. On EMR, this can also be an s3:// URI, or file:// to reference a jar on the local filesystem of your EMR instance(s).
  • main_class – Your application’s main class (e.g. 'org.apache.spark.examples.SparkPi')
  • args – (optional) A list of arguments to the script. Use mrjob.step.INPUT and OUTPUT to interpolate input and output paths.
  • jobconf – (optional) A dictionary of Hadoop properties
  • spark_args – (optional) an array of arguments to pass to spark-submit (e.g. ['--executor-memory', '2G']).

jar and main_class can also be passed as positional arguments

Sample description of a SparkJarStep:

{
    'type': 'spark_jar',
    'jar': 'binks.jar.jar',
    'main_class': 'MyMainMan',  # optional
    'args': ['argh', 'argh'],  # optional
    'jobconf': { ... },  # optional
    'spark_args': ['--executor-memory', '2G'],  # optional
}

To give your Spark JAR access to input files and an empty output directory managed by mrjob, you may include INPUT and OUTPUT in args.

class mrjob.step.SparkScriptStep(script, **kwargs)

Represents a running a separate Python script through Spark

Accepts the following keyword arguments:

Parameters:
  • script – The local path to the Python script to run. On EMR, this can also be an s3:// URI, or file:// to reference a jar on the local filesystem of your EMR instance(s).
  • args – (optional) A list of arguments to the script. Use mrjob.step.INPUT and OUTPUT to interpolate input and output paths.
  • jobconf – (optional) A dictionary of Hadoop properties
  • spark_args – (optional) an array of arguments to pass to spark-submit (e.g. ['--executor-memory', '2G']).

script can also be passed as a positional argument

Sample description of a ScriptStep:

{
     'type': 'spark_script',
     'script': 'my_spark_script.py',
     'args': ['script_arg1', 'script_arg2'],
     'jobconf': { ... },  # optional
     'spark_args': ['--executor-memory', '2G'],  # optional
 }

To give your Spark script access to input files and an empty output directory managed by mrjob, you may include INPUT and OUTPUT in args.

Argument interpolation

Use these constants in your step’s args and mrjob will automatically replace them before running your step.

mrjob.step.INPUT = '<input>'

If passed as an argument to JarStep, SparkJarStep, or SparkScriptStep, it’ll be replaced with the step’s input path(s). If there are multiple paths, they’ll be joined with commas.

mrjob.step.OUTPUT = '<output>'

If this is passed as an argument to JarStep, SparkJarStep, or SparkScriptStep, it’ll be replaced with the step’s output path

mrjob.step.GENERIC_ARGS = '<generic args>'

If this is passed as an argument to JarStep, it’ll be replaced with generic hadoop args (-D and -libjars)