Spark¶
Why use mrjob with Spark?¶
mrjob augments Spark‘s native Python support with the following features familiar to users of mrjob:
- automatically upload input and other support files to HDFS, GCS, or S3 (see upload_files, upload_archives, and py_files)
- run make and other command before running Spark tasks (see setup).
- passthrough and file arguments (see Defining command line options)
- automatically parse logs to explain errors and other Spark job failures
- easily pass through environment variables (see cmdenv)
- support for libjars
- automatic matching of Python version (see python_bin)
- automatically set up Spark on EMR (see bootstrap_spark)
- automatically making the mrjob library available to your job (see bootstrap_mrjob)
mrjob spark-submit¶
If you already have a Spark script written, the easiest way to access mrjob’s features is to run your job with mrjob spark-submit, just like you would normally run it with spark-submit. This can, for instance, make running a Spark job on EMR as easy as running it locally, or allow you to access features (e.g. setup) not natively supported by Spark.
For more details, see mrjob spark-submit.
Writing your first Spark MRJob¶
Another way to integrate mrjob with Spark is to add a
spark()
method to your MRJob
class, and put your Spark code inside it. This will allow you to access
features only availble to MRJobs (e.g. FILES
).
Here’s how you’d implement a word frequency count job in Spark:
import re
from operator import add
from mrjob.job import MRJob
WORD_RE = re.compile(r"[\w']+")
class MRSparkWordcount(MRJob):
def spark(self, input_path, output_path):
# Spark may not be available where script is launched
from pyspark import SparkContext
sc = SparkContext(appName='mrjob Spark wordcount script')
lines = sc.textFile(input_path)
counts = (
lines.flatMap(self.get_words)
.map(lambda word: (word, 1))
.reduceByKey(add))
counts.saveAsTextFile(output_path)
sc.stop()
def get_words(self, line):
return WORD_RE.findall(line)
if __name__ == '__main__':
MRSparkWordcount.run()
Since Spark already supports Python, mrjob takes care of setting up your
cluster, passes in input and output paths, and otherwise gets out of the way.
If you pass in multiple input paths, input_path will be these paths joined
by a comma (SparkContext.textFile()
will accept this).
Note that pyspark
is imported inside the
spark()
method. This allows your job to run whether
pyspark
is installed locally or not.
The spark()
method can be used to execute arbitrary
code, so there’s nothing stopping you from using SparkSession instead of
SparkContext in Spark 2, or writing a streaming-mode job rather than a
batch one.
Warning
Prior to v0.6.8, to pass job methods into Spark
(e.g. rdd.flatMap(self.get_words)
), you first had to call
self.sandbox()
; otherwise
Spark would error because self was not serializable.
Running on your Spark cluster¶
By default, mrjob runs your job on the inline runner (see below).
If you want to run your job on your own Spark cluster, run it with
-r spark
:
Use --spark-master
(see spark_master) to control where your
job runs.
You can pass in spark options with -D
(see jobconf) and
set deploy mode (client or cluster) with --spark-deploy-mode
. If you need
to pass other arguments to spark-submit, use
spark_args.
The Spark runner can also run “classic” MRJobs (i.e. those made
by defining mapper()
etc. or with
MRStep
s) directly on Spark, allowing you to move
off Hadoop without rewriting your jobs. See
below for details.
Warning
If you don’t set spark_master, your job will run on Spark’s
default local[*]
master, which can’t handle setup scripts
or --files
because it doesn’t give tasks their own working directory.
Note
mrjob needs to know what master and deploy mode you’re using, so it will
override attempts to set spark master or deploy mode through
jobconf (e.g. -D spark.master=...
).
Using remote filesystems other than HDFS¶
By default, if you use a remote Spark master (i.e. not local
or
local-cluster
), Spark will assume you want to use HDFS for your job’s
temp space, and that you will want to access it through hadoop fs.
Some Spark installations don’t use HDFS at all. Fortunately, the Spark runner
also supports S3 and GCS. Use spark_tmp_dir to specify a remote
temp directory not on HDFS (e.g. --spark-tmp-dir s3a://bucket/path
).
For more information on accessing S3 or GCS, see Configuring AWS credentials (S3) or Configuring Google Cloud credentials (GCS).
Other ways to run on Spark¶
Inline runner¶
Running your Spark job with -r inline
(the default) will launch it
directly through the pyspark
library, effectively running it on the
local[*]
master. This is convenient for debugging because exceptions will
bubble up directly to your Python process.
The inline runner also builds a simulated working directory for your job, making it possible to test scripts that rely on certain files being in the working directory (it doesn’t run setup scripts).
Note
If you don’t have a local Spark installation, the pyspark library
on PyPI is a pretty quick way to get one (pip install pyspark
).
Local runner¶
Running your Spark job with -r local
will launch it through
spark-submit on a local-cluster
master. local-cluster
is designed to simulate a real Spark cluster, so setup
will work as expected.
By default, the local runner launches Spark jobs with as many executors
as your system has CPUs. Use --num-cores
(see num_cores
to change this).
By default, the local runner gives each executor 1 GB of memory. If you need
more, you can specify it through jobconf, e.g. -D spark.core.memory=4g
.
EMR runner¶
Running your Spark job with -r emr
will launch it in Amazon Elastic
MapReduce (EMR), with the same seamless integration and features mrjob provides
for Hadoop jobs on EMR.
The EMR runner will always run your job on the yarn
Spark master in
cluster
deploy mode.
Hadoop runner¶
Running your Spark job with -r hadoop
will launch it on your own Hadoop
cluster. This is not significantly different than the Spark runner. The main
advantage of the Hadoop runner is that is has more knowledge about how to find
logs and can be better at finding the relevant error if your job fails.
Unlike the Spark runner, the Hadoop runner’s default spark master is yarn
.
Note
mrjob does not yet support Spark on Google Cloud Dataproc.
Passing in libraries¶
Use --py-files
to pass in .zip
or .egg
files full of Python code:
python your_mr_spark_job -r hadoop --py-files lib1.zip,lib2.egg
Or set py_files in mrjob.conf
.
Command-line options¶
Command-line options (passthrough options, etc) work exactly like they
do with regular streaming jobs (even add_file_arg()
on the local[*]
Spark master. See Defining command line options.
Uploading files to the working directory¶
upload_files, FILES
, and files
uploaded via setup scripts all should work as expected (except
on local
masters because there is no working directory).
Note that you can give files a different name in the working directory
(e.g. --files foo#bar
) on all Spark masters, even though Spark treats
that as a YARN-specific feature.
Archives and directories¶
Spark treats --archives
as a YARN-specific feature. This means that
upload_archives, ARCHIVES
,
DIRS
, etc. will be ignored on non-yarn
Spark masters.
Future versions of mrjob may simulate archives on non-yarn
masters
using a setup script.
Multi-step jobs¶
There generally isn’t a need to define multiple Spark steps (Spark lets
you map/reduce as many times as you want). However, it may sometimes be useful
to pre- or post-process Spark data using a
streaming
or
jar
step.
This is accomplished by overriding your job’s steps()
method and using the SparkStep
class:
def steps():
return [
MRStep(mapper=self.preprocessing_mapper),
SparkStep(spark=self.spark),
]
External Spark scripts¶
mrjob can also be used to launch external (non-mrjob) Spark scripts using
the SparkScriptStep
class, which specifies the
path (or URI) of the script and its arguments.
As with JarStep
s, you can interpolate input
and output paths using INPUT
and
OUTPUT
constants. For example, you could set your job’s
steps()
method up like this:
def steps():
return [
SparkScriptStep(
script=os.path.join(
os.path.dirname(__file__), 'my_spark_script.py'),
args=[INPUT, '-o', OUTPUT, '--other-switch'],
),
]
Custom input and output formats¶
mrjob allows you to use input and output formats from custom JARs with Spark, just like you can with streaming jobs.
First download your JAR
to the same directory as your job, and add it to your job class with the
LIBJARS
attribute:
LIBJARS = ['nicknack-1.0.0.jar']
Then use Spark’s own capabilities to reference your input or output format, keeping in mind the data types they expect.
For example, nicknack’s MultipleValueOutputFormat
expects <Text,Text>
,
so if we wanted to integrate it with our wordcount example, we’d have to
convert the count to a string:
def spark(self, input_path, output_path):
from pyspark import SparkContext
sc = SparkContext(appName='mrjob Spark wordcount script')
lines = sc.textFile(input_path)
counts = (
lines.flatMap(self.get_words
.map(lambda word: (word, 1))
.reduceByKey(add))
# MultipleValueOutputFormat expects Text, Text
# w_c is (word, count)
counts = counts.map(lambda w_c: (w_c[0], str(w_c[1])))
counts.saveAsHadoopFile(output_path,
'nicknack.MultipleValueOutputFormat')
sc.stop()
Running “classic” MRJobs on Spark¶
The Spark runner provides near-total support for running “classic”
MRJob
s (the sort described in
Writing your first job and Writing your second job)
directly on any Spark installation, even
though these jobs were originally designed to run on Hadoop Streaming.
Support includes:
Jobs will often run more quickly on Spark than Hadoop Streaming, so it’s worth trying even if you don’t plan to move off Hadoop in the forseeable future.
Multiple steps are run as a single job¶
If you have a job with multiple consecutive MRStep
s,
the Spark runner will run them all as a single Spark job. This is usually what
you want (more efficient), but it can make debugging slightly more
challenging (step failure exceptions
give a range of steps, no way to access intermediate data).
To force the Spark runner to run steps separately, you can initialize
each MRStep
with a different jobconf
dictionary.
No support for subprocesses¶
Pre-filters (e.g. mapper_pre_filter()
) and
command steps (e.g. reducer_cmd()
) are not
supported because they require launching subprocesses.
It wouldn’t be impossible to emulate this inside Spark, but then we’d essentially be turning Spark into Hadoop Streaming. (If you have a use case for this seemingly implausible feature, let us know through GitHub.)
Spark loves combiners¶
Hadoop’s “reduce” paradigm is a lot more heavyweight than Spark’s; whereas a Spark reducer just wants to know how to combine two values into one, a Hadoop reducer expects to be able to see all the values for a given key, and to emit zero or more key-value pairs.
In fact, Spark reducers are a lot more like Hadoop combiners. The Spark runner knows how to translate something like:
def combiner(self, key, values):
yield key, sum(values)
into Spark’s reduce paradigm–basically it’ll pass your combiner two values at a time, and hope it emits one. If your combiner does not behave like a Spark reducer function (emitting multiple or zero values), the Spark runner handles that gracefully as well.
Counter emulation is almost perfect¶
Counters (see increment_counter()
) are a feature
specific to Hadoop. mrjob emulates them on Spark anyway. If you have a
multi-step job, mrjob will dutifully print out counters for each step and
make them available through counters()
.
The only drawback is that while Hadoop has the ability to “take back” counters produced by a failed task, there isn’t a clean way to do this with Spark accumulators. Therefore, the counters produced by the Spark runner’s Hadoop emulation may be overestimates.
Spark does not stream data¶
While Hadoop streaming (as its name implies) passes a stream of data to your job, Spark instead operates on partitions, which are loaded into memory.
A reducer like this can’t run out of memory on Hadoop streaming, no matter how many values there are for key:
def reducer(self, key, values):
yield key, sum(values)
However, on Spark, simply storing the partition that contains these values can cause Spark to run out of memory.
If this happens, you can let Spark use more memory
(-D spark.executor.memory=10g
) or add a combiner to your job.
Compression emulation¶
It’s fairly common for people to request compressed output from Hadoop via configuration properties, for example:
python mr_your_job.py -D mapreduce.output.fileoutputformat.compress=true -D\
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codec ...
This works with -r spark
too; the Spark runner knows how to recognize these
properties and pass the codec specified to Spark when it writes output.
Spark won’t split .gz files either¶
A common trick on Hadoop to ensure that segments of your data don’t get split between mappers is to gzip each segment (since .gz is not a seekable compression format).
This works on Spark as well.
Controlling number of output files¶
By default, Spark will write one output file per partition. This may give more output files than you expect, since Hadoop and Spark are tuned differently.
The Spark runner knows how to emulate the Hadoop configuration property that
sets number of reducers on Hadoop (e.g. -D mapreduce.job.reduces=100
),
which will control the number of output files (assuming your last step has a
reducer).
However, this is a somewhat heavyweight solution; once Spark runs a step’s reducer, mrjob has to forbid Spark from re-partitioning until the end of the step.
A lighter weight solution is --max-output-files
, allows you to limit
the number of output files by running coalesce()
just before
writing output. Running your job with --max-output-files=100
would ensure
it produces no more than 100 output files (but it could output less).
Running classic MRJobs on Spark on EMR¶
It’s often faster to run classic MRJobs on Spark than Hadoop Streaming. It’s also convenient to be able to run on EMR rather than setting up your own Spark cluster (or SSH’ing in).
Can you do both? Yes! Run the job with the Spark runner, but tell it to use mrjob spark-submit to launch Spark jobs on EMR.
It looks something like this:
python mr_your_job.py -r spark \
--spark-submit-bin 'mrjob spark-submit -r emr' \
--spark-master yarn --spark-tmp-dir s3://your-bucket/tmp/ input1 input2
Note that because the Spark runner itself doesn’t know the job is going to run on EMR, you have to give it a couple of hints so that it knows it’s running on YARN (--spark-master) and that it needs to use S3 as its temp space (--spark-tmp-dir).