Spark

Why use mrjob with Spark?

mrjob augments Spark‘s native Python support with the following features familiar to users of mrjob:

mrjob spark-submit

If you already have a Spark script written, the easiest way to access mrjob’s features is to run your job with mrjob spark-submit, just like you would normally run it with spark-submit. This can, for instance, make running a Spark job on EMR as easy as running it locally, or allow you to access features (e.g. setup) not natively supported by Spark.

For more details, see mrjob spark-submit.

Writing your first Spark MRJob

Another way to integrate mrjob with Spark is to add a spark() method to your MRJob class, and put your Spark code inside it. This will allow you to access features only availble to MRJobs (e.g. FILES).

Here’s how you’d implement a word frequency count job in Spark:

import re
from operator import add

from mrjob.job import MRJob

WORD_RE = re.compile(r"[\w']+")


class MRSparkWordcount(MRJob):

    def spark(self, input_path, output_path):
        # Spark may not be available where script is launched
        from pyspark import SparkContext

        sc = SparkContext(appName='mrjob Spark wordcount script')

        lines = sc.textFile(input_path)

        counts = (
            lines.flatMap(self.get_words)
            .map(lambda word: (word, 1))
            .reduceByKey(add))

        counts.saveAsTextFile(output_path)

        sc.stop()

    def get_words(self, line):
        return WORD_RE.findall(line)


if __name__ == '__main__':
    MRSparkWordcount.run()

Since Spark already supports Python, mrjob takes care of setting up your cluster, passes in input and output paths, and otherwise gets out of the way. If you pass in multiple input paths, input_path will be these paths joined by a comma (SparkContext.textFile() will accept this).

Note that pyspark is imported inside the spark() method. This allows your job to run whether pyspark is installed locally or not.

The spark() method can be used to execute arbitrary code, so there’s nothing stopping you from using SparkSession instead of SparkContext in Spark 2, or writing a streaming-mode job rather than a batch one.

Warning

Prior to v0.6.8, to pass job methods into Spark (e.g. rdd.flatMap(self.get_words)), you first had to call self.sandbox(); otherwise Spark would error because self was not serializable.

Running on your Spark cluster

By default, mrjob runs your job on the inline runner (see below). If you want to run your job on your own Spark cluster, run it with -r spark:

Use --spark-master (see spark_master) to control where your job runs.

You can pass in spark options with -D (see jobconf) and set deploy mode (client or cluster) with --spark-deploy-mode. If you need to pass other arguments to spark-submit, use spark_args.

The Spark runner can also run “classic” MRJobs (i.e. those made by defining mapper() etc. or with MRSteps) directly on Spark, allowing you to move off Hadoop without rewriting your jobs. See below for details.

Warning

If you don’t set spark_master, your job will run on Spark’s default local[*] master, which can’t handle setup scripts or --files because it doesn’t give tasks their own working directory.

Note

mrjob needs to know what master and deploy mode you’re using, so it will override attempts to set spark master or deploy mode through jobconf (e.g. -D spark.master=...).

Using remote filesystems other than HDFS

By default, if you use a remote Spark master (i.e. not local or local-cluster), Spark will assume you want to use HDFS for your job’s temp space, and that you will want to access it through hadoop fs.

Some Spark installations don’t use HDFS at all. Fortunately, the Spark runner also supports S3 and GCS. Use spark_tmp_dir to specify a remote temp directory not on HDFS (e.g. --spark-tmp-dir s3a://bucket/path).

For more information on accessing S3 or GCS, see Configuring AWS credentials (S3) or Configuring Google Cloud credentials (GCS).

Other ways to run on Spark

Inline runner

Running your Spark job with -r inline (the default) will launch it directly through the pyspark library, effectively running it on the local[*] master. This is convenient for debugging because exceptions will bubble up directly to your Python process.

The inline runner also builds a simulated working directory for your job, making it possible to test scripts that rely on certain files being in the working directory (it doesn’t run setup scripts).

Note

If you don’t have a local Spark installation, the pyspark library on PyPI is a pretty quick way to get one (pip install pyspark).

Local runner

Running your Spark job with -r local will launch it through spark-submit on a local-cluster master. local-cluster is designed to simulate a real Spark cluster, so setup will work as expected.

By default, the local runner launches Spark jobs with as many executors as your system has CPUs. Use --num-cores (see num_cores to change this).

By default, the local runner gives each executor 1 GB of memory. If you need more, you can specify it through jobconf, e.g. -D spark.core.memory=4g.

EMR runner

Running your Spark job with -r emr will launch it in Amazon Elastic MapReduce (EMR), with the same seamless integration and features mrjob provides for Hadoop jobs on EMR.

The EMR runner will always run your job on the yarn Spark master in cluster deploy mode.

Hadoop runner

Running your Spark job with -r hadoop will launch it on your own Hadoop cluster. This is not significantly different than the Spark runner. The main advantage of the Hadoop runner is that is has more knowledge about how to find logs and can be better at finding the relevant error if your job fails.

Unlike the Spark runner, the Hadoop runner’s default spark master is yarn.

Note

mrjob does not yet support Spark on Google Cloud Dataproc.

Passing in libraries

Use --py-files to pass in .zip or .egg files full of Python code:

python your_mr_spark_job -r hadoop --py-files lib1.zip,lib2.egg

Or set py_files in mrjob.conf.

Command-line options

Command-line options (passthrough options, etc) work exactly like they do with regular streaming jobs (even add_file_arg() on the local[*] Spark master. See Defining command line options.

Uploading files to the working directory

upload_files, FILES, and files uploaded via setup scripts all should work as expected (except on local masters because there is no working directory).

Note that you can give files a different name in the working directory (e.g. --files foo#bar) on all Spark masters, even though Spark treats that as a YARN-specific feature.

Archives and directories

Spark treats --archives as a YARN-specific feature. This means that upload_archives, ARCHIVES, DIRS, etc. will be ignored on non-yarn Spark masters.

Future versions of mrjob may simulate archives on non-yarn masters using a setup script.

Multi-step jobs

There generally isn’t a need to define multiple Spark steps (Spark lets you map/reduce as many times as you want). However, it may sometimes be useful to pre- or post-process Spark data using a streaming or jar step.

This is accomplished by overriding your job’s steps() method and using the SparkStep class:

def steps():
    return [
        MRStep(mapper=self.preprocessing_mapper),
        SparkStep(spark=self.spark),
    ]

External Spark scripts

mrjob can also be used to launch external (non-mrjob) Spark scripts using the SparkScriptStep class, which specifies the path (or URI) of the script and its arguments.

As with JarSteps, you can interpolate input and output paths using INPUT and OUTPUT constants. For example, you could set your job’s steps() method up like this:

def steps():
    return [
        SparkScriptStep(
           script=os.path.join(
               os.path.dirname(__file__), 'my_spark_script.py'),
           args=[INPUT, '-o', OUTPUT, '--other-switch'],
        ),
    ]

Custom input and output formats

mrjob allows you to use input and output formats from custom JARs with Spark, just like you can with streaming jobs.

First download your JAR to the same directory as your job, and add it to your job class with the LIBJARS attribute:

LIBJARS = ['nicknack-1.0.0.jar']

Then use Spark’s own capabilities to reference your input or output format, keeping in mind the data types they expect.

For example, nicknack’s MultipleValueOutputFormat expects <Text,Text>, so if we wanted to integrate it with our wordcount example, we’d have to convert the count to a string:

def spark(self, input_path, output_path):
    from pyspark import SparkContext

    sc = SparkContext(appName='mrjob Spark wordcount script')

    lines = sc.textFile(input_path)

    counts = (
        lines.flatMap(self.get_words
        .map(lambda word: (word, 1))
        .reduceByKey(add))

    # MultipleValueOutputFormat expects Text, Text
    # w_c is (word, count)
    counts = counts.map(lambda w_c: (w_c[0], str(w_c[1])))

    counts.saveAsHadoopFile(output_path,
                            'nicknack.MultipleValueOutputFormat')

    sc.stop()

Running “classic” MRJobs on Spark

The Spark runner provides near-total support for running “classic” MRJobs (the sort described in Writing your first job and Writing your second job) directly on any Spark installation, even though these jobs were originally designed to run on Hadoop Streaming. Support includes:

Jobs will often run more quickly on Spark than Hadoop Streaming, so it’s worth trying even if you don’t plan to move off Hadoop in the forseeable future.

Multiple steps are run as a single job

If you have a job with multiple consecutive MRSteps, the Spark runner will run them all as a single Spark job. This is usually what you want (more efficient), but it can make debugging slightly more challenging (step failure exceptions give a range of steps, no way to access intermediate data).

To force the Spark runner to run steps separately, you can initialize each MRStep with a different jobconf dictionary.

No support for subprocesses

Pre-filters (e.g. mapper_pre_filter()) and command steps (e.g. reducer_cmd()) are not supported because they require launching subprocesses.

It wouldn’t be impossible to emulate this inside Spark, but then we’d essentially be turning Spark into Hadoop Streaming. (If you have a use case for this seemingly implausible feature, let us know through GitHub.)

Spark loves combiners

Hadoop’s “reduce” paradigm is a lot more heavyweight than Spark’s; whereas a Spark reducer just wants to know how to combine two values into one, a Hadoop reducer expects to be able to see all the values for a given key, and to emit zero or more key-value pairs.

In fact, Spark reducers are a lot more like Hadoop combiners. The Spark runner knows how to translate something like:

def combiner(self, key, values):
    yield key, sum(values)

into Spark’s reduce paradigm–basically it’ll pass your combiner two values at a time, and hope it emits one. If your combiner does not behave like a Spark reducer function (emitting multiple or zero values), the Spark runner handles that gracefully as well.

Counter emulation is almost perfect

Counters (see increment_counter()) are a feature specific to Hadoop. mrjob emulates them on Spark anyway. If you have a multi-step job, mrjob will dutifully print out counters for each step and make them available through counters().

The only drawback is that while Hadoop has the ability to “take back” counters produced by a failed task, there isn’t a clean way to do this with Spark accumulators. Therefore, the counters produced by the Spark runner’s Hadoop emulation may be overestimates.

Spark does not stream data

While Hadoop streaming (as its name implies) passes a stream of data to your job, Spark instead operates on partitions, which are loaded into memory.

A reducer like this can’t run out of memory on Hadoop streaming, no matter how many values there are for key:

def reducer(self, key, values):
    yield key, sum(values)

However, on Spark, simply storing the partition that contains these values can cause Spark to run out of memory.

If this happens, you can let Spark use more memory (-D spark.executor.memory=10g) or add a combiner to your job.

Compression emulation

It’s fairly common for people to request compressed output from Hadoop via configuration properties, for example:

python mr_your_job.py -D mapreduce.output.fileoutputformat.compress=true -D\
 mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.BZip2Codec ...

This works with -r spark too; the Spark runner knows how to recognize these properties and pass the codec specified to Spark when it writes output.

Spark won’t split .gz files either

A common trick on Hadoop to ensure that segments of your data don’t get split between mappers is to gzip each segment (since .gz is not a seekable compression format).

This works on Spark as well.

Controlling number of output files

By default, Spark will write one output file per partition. This may give more output files than you expect, since Hadoop and Spark are tuned differently.

The Spark runner knows how to emulate the Hadoop configuration property that sets number of reducers on Hadoop (e.g. -D mapreduce.job.reduces=100), which will control the number of output files (assuming your last step has a reducer).

However, this is a somewhat heavyweight solution; once Spark runs a step’s reducer, mrjob has to forbid Spark from re-partitioning until the end of the step.

A lighter weight solution is --max-output-files, allows you to limit the number of output files by running coalesce() just before writing output. Running your job with --max-output-files=100 would ensure it produces no more than 100 output files (but it could output less).

Running classic MRJobs on Spark on EMR

It’s often faster to run classic MRJobs on Spark than Hadoop Streaming. It’s also convenient to be able to run on EMR rather than setting up your own Spark cluster (or SSH’ing in).

Can you do both? Yes! Run the job with the Spark runner, but tell it to use mrjob spark-submit to launch Spark jobs on EMR.

It looks something like this:

python mr_your_job.py -r spark \
  --spark-submit-bin 'mrjob spark-submit -r emr' \
  --spark-master yarn --spark-tmp-dir s3://your-bucket/tmp/ input1 input2

Note that because the Spark runner itself doesn’t know the job is going to run on EMR, you have to give it a couple of hints so that it knows it’s running on YARN (--spark-master) and that it needs to use S3 as its temp space (--spark-tmp-dir).