mrjob.runner - base class for all runners

class mrjob.runner.MRJobRunner(mr_job_script=None, conf_paths=None, extra_args=None, file_upload_args=None, hadoop_input_format=None, hadoop_output_format=None, input_paths=None, output_dir=None, partitioner=None, sort_values=None, stdin=None, step_output_dir=None, **opts)

Abstract base class for all runners

MRJobRunner.__init__(mr_job_script=None, conf_paths=None, extra_args=None, file_upload_args=None, hadoop_input_format=None, hadoop_output_format=None, input_paths=None, output_dir=None, partitioner=None, sort_values=None, stdin=None, step_output_dir=None, **opts)

All runners take the following keyword arguments:

Parameters:
  • mr_job_script (str) – the path of the .py file containing the MRJob. If this is None, you won’t actually be able to run() the job, but other utilities (e.g. ls()) will work.
  • conf_paths (None or list) – List of config files to combine and use, or None to search for mrjob.conf in the default locations.
  • extra_args (list of str) – a list of extra cmd-line arguments to pass to the mr_job script. This is a hook to allow jobs to take additional arguments.
  • file_upload_args – a list of tuples of ('--ARGNAME', path). The file at the given path will be uploaded to the local directory of the mr_job script when it runs, and then passed into the script with --ARGNAME. Useful for passing in SQLite DBs and other configuration files to your job.
  • hadoop_input_format (str) – name of an optional Hadoop InputFormat class. Passed to Hadoop along with your first step with the -inputformat option. Note that if you write your own class, you’ll need to include it in your own custom streaming jar (see hadoop_streaming_jar).
  • hadoop_output_format (str) – name of an optional Hadoop OutputFormat class. Passed to Hadoop along with your first step with the -outputformat option. Note that if you write your own class, you’ll need to include it in your own custom streaming jar (see hadoop_streaming_jar).
  • input_paths (list of str) – Input files for your job. Supports globs and recursively walks directories (e.g. ['data/common/', 'data/training/*.gz']). If this is left blank, we’ll read from stdin
  • output_dir (str) – An empty/non-existent directory where Hadoop should put the final output from the job. If you don’t specify an output directory, we’ll output into a subdirectory of this job’s temporary directory. You can control this from the command line with --output-dir. This option cannot be set from configuration files. If used with the hadoop runner, this path does not need to be fully qualified with hdfs:// URIs because it’s understood that it has to be on HDFS.
  • partitioner (str) – Optional name of a Hadoop partitioner class, e.g. 'org.apache.hadoop.mapred.lib.HashPartitioner'. Hadoop streaming will use this to determine how mapper output should be sorted and distributed to reducers.
  • sort_values (bool) – if true, set partitioners and jobconf variables so that reducers to receive the values associated with any key in sorted order (sorted by their encoded value). Also known as secondary sort.
  • stdin – an iterable (can be a BytesIO or even a list) to use as stdin. This is a hook for testing; if you set stdin via sandbox(), it’ll get passed through to the runner. If for some reason your lines are missing newlines, we’ll add them; this makes it easier to write automated tests.
  • step_output_dir (str) – An empty/non-existent directory where Hadoop should put output from all steps other than the last one (this only matters for multi-step jobs). Currently ignored by local runners.

Running your job

MRJobRunner.run()

Run the job, and block until it finishes.

Raise StepFailedException if there are any problems (except on InlineMRJobRunner, where we raise the actual exception that caused the step to fail).

MRJobRunner.stream_output()

Stream raw lines from the job’s output. You can parse these using the read() method of the appropriate HadoopStreamingProtocol class.

MRJobRunner.cleanup(mode=None)

Clean up running jobs, temp files, and logs, subject to the cleanup option passed to the constructor.

If you create your runner in a with block, cleanup() will be called automatically:

with mr_job.make_runner() as runner:
    ...

# cleanup() called automatically here
Parameters:mode – override cleanup passed into the constructor. Should be a list of strings from CLEANUP_CHOICES
mrjob.runner.CLEANUP_CHOICES = ['ALL', 'CLOUD_TMP', 'CLUSTER', 'HADOOP_TMP', 'JOB', 'LOCAL_TMP', 'LOGS', 'NONE', 'TMP']

list() -> new empty list list(iterable) -> new list initialized from iterable’s items

Run Information

MRJobRunner.counters()

Get counters associated with this run in this form:

[{'group name': {'counter1': 1, 'counter2': 2}},
 {'group name': ...}]

The list contains an entry for every step of the current job.

MRJobRunner.get_hadoop_version()

Return the version number of the Hadoop environment as a string if Hadoop is being used or simulated. Return None if not applicable.

EMRJobRunner infers this from the cluster. HadoopJobRunner gets this from hadoop version. LocalMRJobRunner has an additional hadoop_version option to specify which version it simulates. InlineMRJobRunner does not simulate Hadoop at all.

Configuration

MRJobRunner.get_opts()

Get options set for this runner, as a dict.

File management

MRJobRunner.fs

Filesystem object for the local filesystem.

class mrjob.fs.base.Filesystem

Some simple filesystem operations that are common across the local filesystem, S3, HDFS, and remote machines via SSH. Different runners provide functionality for different filesystems via their fs attribute. The hadoop and emr runners provide support for multiple protocols using CompositeFilesystem.

Protocol support:

  • mrjob.fs.hadoop.HadoopFilesystem: hdfs://, others
  • mrjob.fs.local.LocalFilesystem: /
  • mrjob.fs.s3.S3Filesystem: s3://bucket/path, s3n://bucket/path
  • mrjob.fs.ssh.SSHFilesystem: ssh://hostname/path
can_handle_path(path)

Can we handle this path at all?

cat(path_glob)

cat all files matching path_glob, decompressing if necessary

du(path_glob)

Get the total size of files matching path_glob

Corresponds roughly to: hadoop fs -du path_glob

exists(path_glob)

Does the given path/URI exist?

Corresponds roughly to: hadoop fs -test -e path_glob

join(path, *paths)

Join paths onto path (which may be a URI)

ls(path_glob)

Recursively list all files in the given path.

We don’t return directories for compatibility with S3 (which has no concept of them)

Corresponds roughly to: hadoop fs -ls -R path_glob

md5sum(path_glob)

Generate the md5 sum of the file at path

mkdir(path)

Create the given dir and its subdirs (if they don’t already exist).

Corresponds roughly to: hadoop fs -mkdir -p path

rm(path_glob)

Recursively delete the given file/directory, if it exists

Corresponds roughly to: hadoop fs -rm -R path_glob

touchz(path)

Make an empty file in the given location. Raises an error if a non-zero length file already exists in that location.

Correponds to: hadoop fs -touchz path