Spark

Why use mrjob with Spark?

mrjob augments Spark‘s native Python support with the following features familiar to users of mrjob:

Writing your first Spark job

The simplest way to integrate mrjob with Spark is to add a spark() method to your MRJob class, and put your Spark code inside it.

Here’s how you’d implement a word frequency count job in Spark:

import re
from operator import add

from mrjob.job import MRJob

WORD_RE = re.compile(r"[\w']+")


class MRSparkWordcount(MRJob):

    def spark(self, input_path, output_path):
        # Spark may not be available where script is launched
        from pyspark import SparkContext

        sc = SparkContext(appName='mrjob Spark wordcount script')

        lines = sc.textFile(input_path)

        counts = (
            lines.flatMap(lambda line: WORD_RE.findall(line))
            .map(lambda word: (word, 1))
            .reduceByKey(add))

        counts.saveAsTextFile(output_path)

        sc.stop()


if __name__ == '__main__':
    MRSparkWordcount.run()

Since Spark already supports Python, mrjob takes care of setting up your cluster, passes in input and output paths, and otherwise gets out of the way. If you pass in multiple input paths, input_path will be these paths joined by a comma (SparkContext.textFile() will accept this).

Note that pyspark is imported inside the spark() method. This allows your job to run whether pyspark is installed locally or not.

The spark() method can be used to execute arbitrary code, so there’s nothing stopping you from using SparkSession instead of SparkContext in Spark 2, or writing a streaming-mode job rather than a batch one.

Running on your own Hadoop cluster

Run your script with -r hadoop:

python your_mr_spark_job -r hadoop input_file1 input_file2 > output

There isn’t currently a “local” or “inline” mode that works independently from Spark, but you can use the spark_master option to run in Spark’s local mode:

python your_mr_spark_job -r hadoop --spark-master local input > output

The Hadoop runner always submits jobs to Spark in client mode, though you could change this using the spark_args option.

Also, note that if you set the Spark master to anything but yarn (the default), Spark will ignore archive files (see upload_archives).

Running on EMR

Run your script with -r emr:

python your_mr_spark_job -r emr input_file1 input_file2 > output

The default EMR image should work fine for most Spark 1 jobs.

If you want to run on Spark 2, please set image_version to 5.0.0 or higher:

python your_mr_spark2_job -r emr --image-version 5.0.0 input > output

EMR introduced Spark support in AMI version 3.8.0, but it’s not recommended to use the 3.x AMIs if you can avoid; they only support Python 2 and have trouble detecting when Spark jobs fail (instead silently producing no output).

The EMR runner always submits jobs to Spark in cluster mode, which it needs to access files on S3.

Passing in libraries

Use --py-file to pass in .zip or .egg files full of Python code:

python your_mr_spark_job -r hadoop --py-file lib1.zip --py-file lib2.egg

Or set py_files in mrjob.conf.

Command-line options

Command-line options (passthrough options, etc.) work exactly like they do with regular streaming jobs. See Defining command line options.

No setup scripts

Unlike with streaming jobs, you can’t wrap Spark jobs in setup scripts; once Spark starts operating on serialized data, it’s operating in pure Python/Java and there’s not a way to slip in a shell script.

If you’re running in EMR, you can use bootstrap scripts to set up your environment when the cluster is created.

Multi-step jobs

There generally isn’t a need to define multiple Spark steps (Spark lets you map/reduce as many times as you want). However, it may sometimes be useful to pre- or post-process Spark data using a streaming or jar step.

This is accomplished by overriding your job’s steps() method and using the SparkStep class:

def steps():
  return [
    MRStep(mapper=self.preprocessing_mapper),
    SparkStep(spark=self.spark),
  ]

External Spark scripts

mrjob can also be used to launch external (non-mrjob) Spark scripts using the SparkScriptStep class, which specifies the path (or URI) of the script and its arguments.

As with JarSteps, you can interpolate input and output paths using INPUT and OUTPUT constants. For example, you could set your job’s steps() method up like this:

def steps():
  return [
    SparkScriptStep(
      script=os.path.join(os.path.dirname(__file__), 'my_spark_script.py'),
      args=[INPUT, '-o', OUTPUT, '--other-switch'],
    ),
  ]

Custom input and output formats

mrjob allows you to use input and output formats from custom JARs with Spark, just like you can with streaming jobs.

First download your JAR to the same directory as your job, and add it to your job class with the LIBJARS attribute:

LIBJARS = ['nicknack-1.0.0.jar']

Then use Spark’s own capabilities to reference your input or output format, keeping in mind the data types they expect.

For example, nicknack’s MultipleValueOutputFormat expects <Text,Text>, so if we wanted to integrate it with our wordcount example, we’d have to convert the count to a string:

def spark(self, input_path, output_path):
  from pyspark import SparkContext

  sc = SparkContext(appName='mrjob Spark wordcount script')

  lines = sc.textFile(input_path)

  counts = (
      lines.flatMap(lambda line: WORD_RE.findall(line))
      .map(lambda word: (word, 1))
      .reduceByKey(add))

  # MultipleValueOutputFormat expects Text, Text
  # w_c is (word, count)
  counts = counts.map(lambda w_c: (w_c[0], str(w_c[1])))

  counts.saveAsHadoopFile(output_path,
                          'nicknack.MultipleValueOutputFormat')

  sc.stop()