The first 3 frustrations you will encounter when migrating spark applications to AWS EMR

Here we share the first 3 frustrations we had when migrating our anomaly detection spark applications to EMR and the ways we troubleshoot them.

Congratulations! Now, you have a spark application that runs in your local environment. Now it is time to scale up and use a multitude of powerful servers to digest really large datasets with the fantastic app you just created. You look around and realize there are multiple options to host your spark application. In our case, we chose EMR, a BigData solution provided by AWS, over others for several reasons:

  • Our solution works with data provided from many, different users so the environment should be able to scale up and down according to the different sizes of datasets users provide
  • We wanted to focus on making our solution better rather than spending resources on setting up and maintaining the spark environment.

Here we share our first 3 frustrations that we encountered in migrating our anomaly detection app in spark to EMR so that future spark users can use EMR without the agony we had. We worked with spark 2.2.0, EMR 5.8.0 and pyspark.

Frustration 1. key not found: SPARK_HOME

Situation

Your spark application runs well with one server but then upon starting using multiple servers, you see the following error message.

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.util.NoSuchElementException: key not found: SPARK_HOME

Reason

An appropriate spark home location is only set in the master node and not set in the core and task nodes.

Solution

Set spark.yarn.appMasterEnv.SPARK_HOME parameter to "/usr/lib/spark".

If you use Python Boto3 to create a cluster, in your configuration add the following:

[{
    "Classification": "spark-defaults"
    , "Properties": {
        "spark.yarn.appMasterEnv.SPARK_HOME": "/usr/lib/spark"
    }
}]

If you use EMR console to create a cluster, after clicking “Create Cluster,” click “Go to advanced options.” In the “Edit software settings (optional),” add the following:

classification=spark-defaults,properties=[spark.yarn.appMasterEnv.SPARK_HOME, /usr/lib/spark]

References

Frustration 2. Container killed by YARN for exceeding memory limits.

Situation

Your application runs well with small data. But then as data size grows, you start seeing the following error message.

ExecutorLostFailure (executor 132 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 15.2 GB of 15.2 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

You start using more number of more expensive servers with more memory but it does not help.

Reason

The amount of virtual memory used exceeds a threshold and NodeManager killed the containers (the default setting is too limited).

Solution

Disable NodeManager’s checking virtual memory usage in containers by setting yarn.nodemanager.vmem-check-enabled to false. If you use Python Boto3, add the following configuration:

[{
    "Classification": "yarn-site"
    , "Properties": {
        "yarn.nodemanager.vmem-check-enabled": "false"
    }
}]

If you use the EMR console, add the following configuration:

classification=yarn-site,properties=[yarn.nodemanager.vmem-check-enabled,false]

References

Frustration 3. Only a couple of servers work

Situation

To handle large data, you use many powerful servers but often only two process tasks while others idle like the below picture.

Here you can see that only two executors are assigned tasks while others are enjoying their free time.

Reason

It is because RDD blocks are not distributed equally among different nodes. By default, to minimize communication cost, tasks are distributed to nodes’ proximity to data. If a node has most of blocks, that node will get to work on most of tasks.

Solution

There can be multiple ways to redistribute RDDs to multiple nodes (e.g. various partition functions such as repartition() and partitionBy()). From our experience, the most effective point to set the number of partitions is when first reading data. Most of Spark functions use the maximum number of partitions among their preceding functions' so setting the right number of partitions in reading data will ensure that subsequent functions will work with the right amount of data too.

  • Bad practice:

    • data = sc.textFile(path)
  • Good practice:

    • data = sc.textFile(path, minPartitions=100)

Setting the appropriate number of partitions is particularly important because if the number is too few, an executor has to deal with data that might not fit with its memory limit and thus can cause the Out of Memory (OOM) error and if too many, the overall communication cost (moving RDD blocks among different nodes) will increase, resulting in poor performance.

What is the good number of partitions to use? It really depends on the problem you are dealing with. In our case, we had to avoid the OOM error for any size of data that users provide (indeed OOM errors are the errors that happened most frequently in our case) and wanted to make sure that every executor has at least one task to work with at any given step to maximize our throughput. Thus, we set the number of partitions as the value of spark.default.parallelism configuration. In EMR, if maximizeResourceAllocation is set, that number is equivalent to "2X number of CPU cores available to YARN containers", which is a good enough number to ensure data will be distributed across all nodes. Also, in this way, the number of partitions will be dynamically set rather than fixed as an arbitrary number (like 100) in the beginning. In pyspark, you can get the configuration value as follow:

spark_conf = SparkConf()  
parallelism = int(spark_conf.get("spark.default.parallelism", "0"))

References

Okay now that your Spark applications run without errors in EMR, let us suggest 2 universally applicable tunings that will further optimize your Spark applications in EMR: 2 tunings you should make for Spark applications running in EMR.

Tags

Comments