Как я unit test программы PySpark?
Мой текущий подход Java/Spark Unit Test работает (подробный здесь), создавая экземпляр SparkContext, используя "локальные" и запущенные модульные тесты с использованием JUnit.
Код должен быть организован для ввода/вывода в одной функции, а затем для вызова другого с несколькими RDD.
Это отлично работает. У меня очень проверенное преобразование данных, написанное на Java + Spark.
Могу ли я сделать то же самое с Python?
Как я могу запустить тесты Spark с помощью Python?
Ответы
Ответ 1
Я бы рекомендовал использовать py.test. py.test позволяет легко создавать повторно используемые тестовые приборы SparkContext и использовать его для написания кратких тестовых функций. Вы также можете специализировать приборы (например, для создания StreamingContext) и использовать один или несколько из них в своих тестах.
Я написал сообщение в блоге на Medium по этой теме:
https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b
Вот фрагмент сообщения:
pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
""" test word couting
Args:
spark_context: test fixture SparkContext
"""
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = spark_context.parallelize(test_input, 1)
results = wordcount.do_word_counts(input_rdd)
expected_results = {'hello':2, 'spark':3, 'again':1}
assert results == expected_results
Ответ 2
Я использую pytest
, который позволяет тестировать приборы, чтобы вы могли создавать экземпляр контекста pyspark и вводить его во все ваши тесты, которые его требуют. Что-то вдоль линий
@pytest.fixture(scope="session",
params=[pytest.mark.spark_local('local'),
pytest.mark.spark_yarn('yarn')])
def spark_context(request):
if request.param == 'local':
conf = (SparkConf()
.setMaster("local[2]")
.setAppName("pytest-pyspark-local-testing")
)
elif request.param == 'yarn':
conf = (SparkConf()
.setMaster("yarn-client")
.setAppName("pytest-pyspark-yarn-testing")
.set("spark.executor.memory", "1g")
.set("spark.executor.instances", 2)
)
request.addfinalizer(lambda: sc.stop())
sc = SparkContext(conf=conf)
return sc
def my_test_that_requires_sc(spark_context):
assert spark_context.textFile('/path/to/a/file').count() == 10
Затем вы можете запустить тесты в локальном режиме, вызвав py.test -m spark_local
или в YARN с помощью py.test -m spark_yarn
. Это сработало для меня хорошо.
Ответ 3
Вот решение с pytest, если вы используете Spark 2.x и SparkSession
. Я также импортирую сторонний пакет.
import logging
import pytest
from pyspark.sql import SparkSession
def quiet_py4j():
"""Suppress spark logging for the test context."""
logger = logging.getLogger('py4j')
logger.setLevel(logging.WARN)
@pytest.fixture(scope="session")
def spark_session(request):
"""Fixture for creating a spark context."""
spark = (SparkSession
.builder
.master('local[2]')
.config('spark.jars.packages', 'com.databricks:spark-avro_2.11:3.0.1')
.appName('pytest-pyspark-local-testing')
.enableHiveSupport()
.getOrCreate())
request.addfinalizer(lambda: spark.stop())
quiet_py4j()
return spark
def test_my_app(spark_session):
...
Обратите внимание, что если использовать Python 3, мне пришлось указать это как переменную среды PYSPARK_PYTHON:
import os
import sys
IS_PY2 = sys.version_info < (3,)
if not IS_PY2:
os.environ['PYSPARK_PYTHON'] = 'python3'
В противном случае вы получите сообщение об ошибке:
Исключение: Python у рабочего имеет разную версию 2.7, чем в драйвер 3.5, PySpark не может работать с различными второстепенными версиями. Пожалуйста, проверить переменные окружения PYSPARK_PYTHON и PYSPARK_DRIVER_PYTHON правильно установлены.
Ответ 4
Предполагая, что у вас установлен pyspark
, вы можете использовать следующий класс для unitTest в unittest
:
import unittest
import pyspark
class PySparkTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing")
cls.sc = pyspark.SparkContext(conf=conf)
@classmethod
def tearDownClass(cls):
cls.sc.stop()
Пример:
class SimpleTestCase(PySparkTestCase):
def test_basic(self):
test_input = [
' hello spark ',
' hello again spark spark'
]
input_rdd = self.sc.parallelize(test_input, 1)
from operator import add
results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])
Ответ 5
Когда-то я тоже столкнулся с той же проблемой, и после прочтения нескольких статей, форумов и некоторых ответов StackOverflow я закончил писать небольшой плагин для pytest: pytest-spark
Я уже использую его в течение нескольких месяцев, и общий рабочий процесс выглядит хорошо в Linux:
- Установить Apache Spark (настроить JVM + распаковать дистрибутив Spark в какой-либо каталог)
- Установите "pytest" + плагин "pytest-spark"
- Создайте "pytest.ini" в каталоге проекта и укажите там место Spark.
- Запустите ваши тесты с помощью pytest, как обычно.
- При желании вы можете использовать "spark_context" в ваших тестах, которые предоставляются плагином, - он пытается минимизировать журналы Spark на выходе.