mrjob.runner - base class for all runners¶
-
class
mrjob.runner.
MRJobRunner
(mr_job_script=None, conf_paths=None, extra_args=None, hadoop_input_format=None, hadoop_output_format=None, input_paths=None, output_dir=None, partitioner=None, sort_values=None, stdin=None, steps=None, step_output_dir=None, **opts)¶ Abstract base class for all runners
-
MRJobRunner.
__init__
(mr_job_script=None, conf_paths=None, extra_args=None, hadoop_input_format=None, hadoop_output_format=None, input_paths=None, output_dir=None, partitioner=None, sort_values=None, stdin=None, steps=None, step_output_dir=None, **opts)¶ All runners take the following keyword arguments:
Parameters: - mr_job_script (str) – the path of the
.py
file containing theMRJob
. If this is None, you won’t actually be able torun()
the job, but other utilities (e.g.ls()
) will work. - conf_paths (None or list) – List of config files to combine and use, or None to search for mrjob.conf in the default locations.
- extra_args (list of str) – a list of extra cmd-line arguments to pass to the mr_job script. This is a hook to allow jobs to take additional arguments.
- hadoop_input_format (str) – name of an optional Hadoop
InputFormat
class. Passed to Hadoop along with your first step with the-inputformat
option. Note that if you write your own class, you’ll need to include it in your own custom streaming jar (see hadoop_streaming_jar). - hadoop_output_format (str) – name of an optional Hadoop
OutputFormat
class. Passed to Hadoop along with your first step with the-outputformat
option. Note that if you write your own class, you’ll need to include it in your own custom streaming jar (see hadoop_streaming_jar). - input_paths (list of str) – Input files for your job. Supports globs and
recursively walks directories (e.g.
['data/common/', 'data/training/*.gz']
). If this is left blank, we’ll read from stdin - output_dir (str) – An empty/non-existent directory where Hadoop
should put the final output from the job.
If you don’t specify an output directory, we’ll
output into a subdirectory of this job’s temporary
directory. You can control this from the command
line with
--output-dir
. This option cannot be set from configuration files. If used with the hadoop runner, this path does not need to be fully qualified withhdfs://
URIs because it’s understood that it has to be on HDFS. - partitioner (str) – Optional name of a Hadoop partitioner class, e.g.
'org.apache.hadoop.mapred.lib.HashPartitioner'
. Hadoop streaming will use this to determine how mapper output should be sorted and distributed to reducers. - sort_values (bool) – if true, set partitioners and jobconf variables so that reducers to receive the values associated with any key in sorted order (sorted by their encoded value). Also known as secondary sort.
- stdin – an iterable (can be a
BytesIO
or even a list) to use as stdin. This is a hook for testing; if you setstdin
viasandbox()
, it’ll get passed through to the runner. If for some reason your lines are missing newlines, we’ll add them; this makes it easier to write automated tests. - steps – a list of descriptions of steps to run (see mrjob.step - represent Job Steps for description formats)
- step_output_dir (str) – An empty/non-existent directory where Hadoop should put output from all steps other than the last one (this only matters for multi-step jobs). Currently ignored by local runners.
- mr_job_script (str) – the path of the
Running your job¶
-
MRJobRunner.
run
()¶ Run the job, and block until it finishes.
Raise
StepFailedException
if there are any problems (except onInlineMRJobRunner
, where we raise the actual exception that caused the step to fail).
-
MRJobRunner.
cat_output
()¶ Stream the job’s output, as a stream of
bytes
. If there are multiple output files, there will be an empty bytestring (b''
) between them.Like Hadoop input formats, we ignore files and subdirectories whose names start with
"_"
or"."
(e.g._SUCCESS
,_logs/
,.part-00000.crc
.Changed in version 0.6.8: Ignore file/dirnames starting with
"."
as well as"_"
.
-
MRJobRunner.
cleanup
(mode=None)¶ Clean up running jobs, temp files, and logs, subject to the cleanup option passed to the constructor.
If you create your runner in a
with
block,cleanup()
will be called automatically:with mr_job.make_runner() as runner: ... # cleanup() called automatically here
Parameters: mode – override cleanup passed into the constructor. Should be a list of strings from CLEANUP_CHOICES
-
mrjob.options.
CLEANUP_CHOICES
= ['ALL', 'CLOUD_TMP', 'CLUSTER', 'HADOOP_TMP', 'JOB', 'LOCAL_TMP', 'LOGS', 'NONE', 'TMP']¶ cleanup options:
'ALL'
: delete logs and local and remote temp files; stop cluster if on EMR and the job is not done when cleanup is run.'CLOUD_TMP'
: delete temp files on cloud storage (e.g. S3) only'CLUSTER'
: terminate the cluster if on EMR and the job is not doneon cleanup
'HADOOP_TMP'
: delete temp files on HDFS only'JOB'
: stop job if on EMR and the job is not done when cleanup runs'LOCAL_TMP'
: delete local temp files only'LOGS'
: delete logs only'NONE'
: delete nothing'TMP'
: delete local, HDFS, and cloud storage temp files, but not logs
Run Information¶
-
MRJobRunner.
counters
()¶ Get counters associated with this run in this form:
[{'group name': {'counter1': 1, 'counter2': 2}}, {'group name': ...}]
The list contains an entry for every step of the current job.
-
MRJobRunner.
get_hadoop_version
()¶ Return the version number of the Hadoop environment as a string if Hadoop is being used or simulated. Return None if not applicable.
EMRJobRunner
infers this from the cluster.HadoopJobRunner
gets this fromhadoop version
.LocalMRJobRunner
has an additional hadoop_version option to specify which version it simulates.InlineMRJobRunner
does not simulate Hadoop at all.
-
MRJobRunner.
get_job_key
()¶ Get the unique key for the job run by this runner. This has the format
label.owner.date.time.microseconds
File management¶
-
MRJobRunner.
fs
¶ Filesystem
object for the local filesystem.
-
class
mrjob.fs.base.
Filesystem
¶ Some simple filesystem operations that are common across the local filesystem, S3, GCS, HDFS, and remote machines via SSH.
Different runners provide functionality for different filesystems via their
fs
attribute. Generally a runner will wrap one or more filesystems withmrjob.fs.composite.CompositeFilesystem
.Schemes supported:
mrjob.fs.gcs.GCSFilesystem
:gs://
mrjob.fs.hadoop.HadoopFilesystem
:hdfs://
and other URIsmrjob.fs.local.LocalFilesystem
: paths andfile://
URIsmrjob.fs.s3.S3Filesystem
:s3://
,s3a://
,s3n://
,mrjob.fs.ssh.SSHFilesystem
:ssh://
Changed in version 0.6.12: LocalFilesystem added support for
file://
URIs-
can_handle_path
(path)¶ Can we handle this path at all?
-
cat
(path_glob)¶ cat all files matching path_glob, decompressing if necessary
This yields bytes, which don’t necessarily correspond to lines (see #1544). If multiple files are catted, yields
b''
between each file.
-
du
(path_glob)¶ Get the total size of files matching
path_glob
Corresponds roughly to:
hadoop fs -du path_glob
-
exists
(path_glob)¶ Does the given path/URI exist?
Corresponds roughly to:
hadoop fs -test -e path_glob
-
join
(path, *paths)¶ Join paths onto path (which may be a URI)
-
ls
(path_glob)¶ Recursively list all files in the given path.
We don’t return directories for compatibility with S3 (which has no concept of them)
Corresponds roughly to:
hadoop fs -ls -R path_glob
-
md5sum
(path)¶ Generate the md5 sum of the file at path
-
mkdir
(path)¶ Create the given dir and its subdirs (if they don’t already exist). On cloud filesystems (e.g. S3), also create the corresponding bucket as needed
Corresponds roughly to:
hadoop fs -mkdir -p path
New in version 0.6.8: creates buckets on cloud filesystems
-
put
(src, path)¶ Upload a file on the local filesystem (src) to path. Like with
shutil.copyfile()
, path should be the full path of the new file, not a directory which should contain it.Corresponds roughly to
hadoop fs -put src path
.New in version 0.6.8.
-
rm
(path_glob)¶ Recursively delete the given file/directory, if it exists
Corresponds roughly to:
hadoop fs -rm -R path_glob
-
touchz
(path)¶ Make an empty file in the given location. Raises an error if a non-zero length file already exists in that location.
Correponds to:
hadoop fs -touchz path