2 tunings you should make for Spark applications running in EMR

Apache Spark provides hundreds of useful configurations for developers to optimize their applications. Here we share universally good 2 tunings you want to make for your Spark applications running in AWS EMR.

Apache Spark provides hundreds of configurable parameters to give a room for developers to optimize their applications for specific needs. In developing our anomaly detection application using Spark, we have found that there are 2 configurations that, if set properly, can always improve applications’ performance. Here we share them so that future Spark developers can enhance their applications without the many trial-and-errors we had.

Tuning 1. Kyro serialization

The largest downside of using multiple servers to process data is the time to transfer data among them (i.e. the communication cost). Consequently, storing data as compact as possible is important to reduce the network time. When using Java or Scala to program Spark applications, you have 3 options to store your objects – in a native form, Java serialization and Kyro serialization. Among the three, Apache Spark recommends Kyro because “is significantly faster and more compact than Java serialization (often as much as 10x).” The two downsides are that not all objects are serializable and you have to let your programs know the structures of classes you want to Kyro-serialize (i.e. register your classes).

You can set your applications to use Kyro serialization by adding the following configuration when submitting a job to AWS EMR.

[
    {
        "Classification": "spark-defaults"
        , "Properties": {
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer"
        }
    }
]

Alternatively, you can set the configuration in your application:

val conf = new SparkConf().setMaster(...).setAppName(...) conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

Then you can register your classes in the following way:

conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf)

If you use Python (i.e. PySpark) to program Spark applications, this tuning is not going to relevant because the Kyro serialization is for Java objects. PySpark’s default way to store objects is to serialize them using the standard pickle library. In case objects cannot be pickled, PySpark uses an improved version of CloudPickle to serialize. In most scenarios, we found this default behavior satisfactory (objects are serialized by default). If you want to further optimize the serialization method, you can set the default serialization when you define SparkContext:

# “Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be faster.”  
from pyspark.serializers import MarshalSerializer  
sc = SparkContext('local', 'test', serializer=MarshalSerializer())

References

Tuning 2. Maximize resource allocation

AWS EMR offers various instance types to balance between performance and cost. Certainly, using different instance types has different implications for your applications and it would be quite a lot of work to set number of cores and memory size whenever you change instance types. EMR sets the following configurations out-of-box according to the number and types of instances to obviate such work.

  • spark.executor.memory
  • spark.executor.cores
  • spark.dynamicAllocation.enabled to true

You can further enhance applications’ performances by using the values that AWS recommends for the following configurations:

  • spark.default.parallelism
  • spark.driver.memory
  • spark.executor.instances

Using the parameters AWS recommends is quite easy. Simply set maximizeResourceAllocation to true in submitting a job to a cluster and the above values will be set considering the number and type of instances:

[
    {
        "Classification": "spark"
        , "Properties": {
            "maximizeResourceAllocation": "true"
        }
    }
]

In this scenario, dynamic allocation is disabled because by setting maximizeResourceAllocation to true, spark.executor.instancesgets fixed. If the number of instances changes over span of your application run time (i.e. constantly takes input of varying sizes), set spark.dynamicAllocation.enabled to true like below. In this scenario, spark.default.parallelism and spark.driver.memory will be set but spark.executor.instances will not be.

[
    {
        "Classification": "spark"
        , "Properties": {
            "maximizeResourceAllocation": "true"
        }
    }, {
        "Classification": "spark-defaults"
        , "Properties": {
            "spark.dynamicAllocation.enabled": "true"
        }
    }
]

References

TL;DR

  1. Sse Kyro serialization
  2. Use EMR’s maximizeResourceAllocation

    • If the size of your cluster changes frequently over time (the cluster constantly takes varying size of data), consider setting spark.dynamicAllocation.enabled to true
  3. If you are suffering because of some errors that are not familiar to you, our previous blog post on the first 3 frustrations you will encounter when migrating spark applications to AWS EMR might help. Check that out.

Tags

Comments