How to unittest PySpark applications

Testing PySpark applications in a local environment ensures their appropriate behaviors without spawning multiple servers and incurring network cost. Nonetheless, writing a unittest for a PySpark application is different from writing one for regular Python applications because a PySpark application requires setting up a Spark context. This blog posts shows 4 simple steps to unittest PySpark applications.

Congratulations! You have successfully installed PySpark and wrote PySpark applications in your local environment. One of big advantages of utilizing a local environment to develop a Spark application is that it does not involve spawning new servers and distributing data to multiple computers which consequently saves time and money. Nonetheless, testing PySpark applications in a local environment is a bit different because it has to set up a Spark context before each test. Here we present 4 simple steps to unittest your PySpark application in your local environment using pytest-spark. This tutorial will unittest the simple word-count application we wrote in our previous blog post “How to Hello-World your first Spark application (local environment)" and assumes that you have installed PySpark in a virtual environment following the steps described in the post.

Step 1. Install pytest-spark

pytest-spark is a pytest plugin to run unittests conveniently on PySpark applications. It is written and managed by Alex Markov. In a nutshell, it has a pytest feature that starts and stops a Spark context automatically before a test session. Install pytest-spark using the commands below:

~$ cd hello_world_spark  
~/hello_world_spark$ source env/bin/activate  
~/hello_world_spark$ pip install pytest-spark

References

Step 2. Write word_count function

The previous blog writes the logic for word counting in a plain script. We want to write the logic in a function for easier unit testing. Copy and paste the following Python script into a file named word_count_function.py in our project directory hello_world_spark.

from operator import add  
   
def word_count(data_rdd):  
    counts = data_rdd.map(lambda x: (x, 1)).reduceByKey(add).sortBy(lambda x: x[1], ascending=False).collect()  
    return counts

Step 3. Write a test

Now it is time to write a test. Just make sure you import pytest_spark.spark_context and take it as an input argument for your unittest function. Copy and paste the following Python script and save it as test_word_count_function.py in our project directory hello_world_spark.

from pytest_spark import spark_context
from word_count_function import word_count
 
def test_word_count_function(spark_context):
    counts = word_count(spark_context.parallelize(list("Hello World")))
    assert counts == [('l', 3), ('o', 2), (' ', 1), ('e', 1), ('d', 1), ('H', 1), ('r', 1), ('W', 1)]

Step 4. Run the test

Run the following command and you will see a handsomely-colored test result like below.

$ pytest

pytest-spark test result

All source code is available in our Github. Happy testing!

Tags

Comments