How to unittest PySpark applications
Testing PySpark applications in a local environment ensures their appropriate behaviors without spawning multiple servers and incurring network cost. Nonetheless, writing a unittest for a PySpark application is different from writing one for regular Python applications because a PySpark application requires setting up a Spark context. This blog posts shows 4 simple steps to unittest PySpark applications.
Congratulations! You have successfully installed PySpark and wrote PySpark applications in your local environment. One of big advantages of utilizing a local environment to develop a Spark application is that it does not involve spawning new servers and distributing data to multiple computers which consequently saves time and money. Nonetheless, testing PySpark applications in a local environment is a bit different because it has to set up a Spark context before each test. Here we present 4 simple steps to unittest your PySpark application in your local environment using
pytest-spark. This tutorial will unittest the simple word-count application we wrote in our previous blog post “How to Hello-World your first Spark application (local environment)" and assumes that you have installed PySpark in a virtual environment following the steps described in the post.
Step 1. Install
pytest-spark is a
pytest plugin to run unittests conveniently on PySpark applications. It is written and managed by Alex Markov. In a nutshell, it has a
pytest feature that starts and stops a Spark context automatically before a test session. Install
pytest-spark using the commands below:
~$ cd hello_world_spark ~/hello_world_spark$ source env/bin/activate ~/hello_world_spark$ pip install pytest-spark
Step 2. Write
The previous blog writes the logic for word counting in a plain script. We want to write the logic in a function for easier unit testing. Copy and paste the following Python script into a file named
word_count_function.py in our project directory
from operator import add def word_count(data_rdd): counts = data_rdd.map(lambda x: (x, 1)).reduceByKey(add).sortBy(lambda x: x, ascending=False).collect() return counts
Step 3. Write a test
Now it is time to write a test. Just make sure you import
pytest_spark.spark_context and take it as an input argument for your unittest function. Copy and paste the following Python script and save it as
test_word_count_function.py in our project directory
from pytest_spark import spark_context from word_count_function import word_count def test_word_count_function(spark_context): counts = word_count(spark_context.parallelize(list("Hello World"))) assert counts == [('l', 3), ('o', 2), (' ', 1), ('e', 1), ('d', 1), ('H', 1), ('r', 1), ('W', 1)]
Step 4. Run the test
Run the following command and you will see a handsomely-colored test result like below.
pytest-spark test result
All source code is available in our Github. Happy testing!