Why use mrjob with Spark?¶
mrjob augments Spark‘s native Python support with the following features familiar to users of mrjob:
- automatically parse logs to explain errors and other Spark job failures
- automatic matching of Python version (see python_bin)
- easily pass through environment variables (see cmdenv)
- support for libjars
- passthrough and file options (see Defining command line options)
- automatically upload input and other support files to HDFS or S3 (see upload_files, upload_archives, and py_files)
- automatically set up Spark on EMR (see bootstrap_spark)
- automatically making the mrjob library available to your job (see bootstrap_mrjob)
Writing your first Spark job¶
Here’s how you’d implement a word frequency count job in Spark:
import re from operator import add from mrjob.job import MRJob WORD_RE = re.compile(r"[\w']+") class MRSparkWordcount(MRJob): def spark(self, input_path, output_path): # Spark may not be available where script is launched from pyspark import SparkContext sc = SparkContext(appName='mrjob Spark wordcount script') lines = sc.textFile(input_path) counts = ( lines.flatMap(lambda line: WORD_RE.findall(line)) .map(lambda word: (word, 1)) .reduceByKey(add)) counts.saveAsTextFile(output_path) sc.stop() if __name__ == '__main__': MRSparkWordcount.run()
Since Spark already supports Python, mrjob takes care of setting up your
cluster, passes in input and output paths, and otherwise gets out of the way.
If you pass in multiple input paths, input_path will be these paths joined
by a comma (
SparkContext.textFile() will accept this).
pyspark is imported inside the
spark() method. This allows your job to run whether
pyspark is installed locally or not.
spark() method can be used to execute arbitrary
code, so there’s nothing stopping you from using SparkSession instead of
SparkContext in Spark 2, or writing a streaming-mode job rather than a
Running on your own Hadoop cluster¶
Run your script with
python your_mr_spark_job -r hadoop input_file1 input_file2 > output
There isn’t currently a “local” or “inline” mode that works independently from Spark, but you can use the spark_master option to run in Spark’s local mode:
python your_mr_spark_job -r hadoop --spark-master local input > output
The Hadoop runner always submits jobs to Spark in
client mode, though
you could change this using the spark_args option.
Also, note that if you set the Spark master to anything but
(the default), Spark will ignore archive files (see
Running on EMR¶
Run your script with
python your_mr_spark_job -r emr input_file1 input_file2 > output
The default EMR image should work fine for most Spark 1 jobs.
If you want to run on Spark 2, please set image_version to 5.0.0 or higher:
python your_mr_spark2_job -r emr --image-version 5.0.0 input > output
EMR introduced Spark support in AMI version 3.8.0, but it’s not recommended to use the 3.x AMIs if you can avoid; they only support Python 2 and have trouble detecting when Spark jobs fail (instead silently producing no output).
The EMR runner always submits jobs to Spark in
cluster mode, which it needs
to access files on S3.
Passing in libraries¶
--py-file to pass in
.egg files full of Python code:
python your_mr_spark_job -r hadoop --py-file lib1.zip --py-file lib2.egg
Or set py_files in
Command-line options (passthrough options, etc.) work exactly like they do with regular streaming jobs. See Defining command line options.
No setup scripts¶
Unlike with streaming jobs, you can’t wrap Spark jobs in setup scripts; once Spark starts operating on serialized data, it’s operating in pure Python/Java and there’s not a way to slip in a shell script.
If you’re running in EMR, you can use bootstrap scripts to set up your environment when the cluster is created.
There generally isn’t a need to define multiple Spark steps (Spark lets
you map/reduce as many times as you want). However, it may sometimes be useful
to pre- or post-process Spark data using a
def steps(): return [ MRStep(mapper=self.preprocessing_mapper), SparkStep(spark=self.spark), ]
External Spark scripts¶
mrjob can also be used to launch external (non-mrjob) Spark scripts using
SparkScriptStep class, which specifies the
path (or URI) of the script and its arguments.
def steps(): return [ SparkScriptStep( script=os.path.join(os.path.dirname(__file__), 'my_spark_script.py'), args=[INPUT, '-o', OUTPUT, '--other-switch'], ), ]
Custom input and output formats¶
mrjob allows you to use input and output formats from custom JARs with Spark, just like you can with streaming jobs.
LIBJARS = ['nicknack-1.0.0.jar']
Then use Spark’s own capabilities to reference your input or output format, keeping in mind the data types they expect.
For example, nicknack’s
so if we wanted to integrate it with our wordcount example, we’d have to
convert the count to a string:
def spark(self, input_path, output_path): from pyspark import SparkContext sc = SparkContext(appName='mrjob Spark wordcount script') lines = sc.textFile(input_path) counts = ( lines.flatMap(lambda line: WORD_RE.findall(line)) .map(lambda word: (word, 1)) .reduceByKey(add)) # MultipleValueOutputFormat expects Text, Text # w_c is (word, count) counts = counts.map(lambda w_c: (w_c, str(w_c))) counts.saveAsHadoopFile(output_path, 'nicknack.MultipleValueOutputFormat') sc.stop()