Как я unit test программы PySpark?

Мой текущий подход Java/Spark Unit Test работает (подробный здесь), создавая экземпляр SparkContext, используя "локальные" и запущенные модульные тесты с использованием JUnit.

Код должен быть организован для ввода/вывода в одной функции, а затем для вызова другого с несколькими RDD.

Это отлично работает. У меня очень проверенное преобразование данных, написанное на Java + Spark.

Могу ли я сделать то же самое с Python?

Как я могу запустить тесты Spark с помощью Python?

Ответы

Ответ 1

Я бы рекомендовал использовать py.test. py.test позволяет легко создавать повторно используемые тестовые приборы SparkContext и использовать его для написания кратких тестовых функций. Вы также можете специализировать приборы (например, для создания StreamingContext) и использовать один или несколько из них в своих тестах.

Я написал сообщение в блоге на Medium по этой теме:

https://engblog.nextdoor.com/unit-testing-apache-spark-with-py-test-3b8970dc013b

Вот фрагмент сообщения:

pytestmark = pytest.mark.usefixtures("spark_context")
def test_do_word_counts(spark_context):
    """ test word couting
    Args:
       spark_context: test fixture SparkContext
    """
    test_input = [
        ' hello spark ',
        ' hello again spark spark'
    ]

    input_rdd = spark_context.parallelize(test_input, 1)
    results = wordcount.do_word_counts(input_rdd)

    expected_results = {'hello':2, 'spark':3, 'again':1}  
    assert results == expected_results

Ответ 2

Я использую pytest, который позволяет тестировать приборы, чтобы вы могли создавать экземпляр контекста pyspark и вводить его во все ваши тесты, которые его требуют. Что-то вдоль линий

@pytest.fixture(scope="session",
                params=[pytest.mark.spark_local('local'),
                        pytest.mark.spark_yarn('yarn')])
def spark_context(request):
    if request.param == 'local':
        conf = (SparkConf()
                .setMaster("local[2]")
                .setAppName("pytest-pyspark-local-testing")
                )
    elif request.param == 'yarn':
        conf = (SparkConf()
                .setMaster("yarn-client")
                .setAppName("pytest-pyspark-yarn-testing")
                .set("spark.executor.memory", "1g")
                .set("spark.executor.instances", 2)
                )
    request.addfinalizer(lambda: sc.stop())

    sc = SparkContext(conf=conf)
    return sc

def my_test_that_requires_sc(spark_context):
    assert spark_context.textFile('/path/to/a/file').count() == 10

Затем вы можете запустить тесты в локальном режиме, вызвав py.test -m spark_local или в YARN с помощью py.test -m spark_yarn. Это сработало для меня хорошо.

Ответ 3

Вот решение с pytest, если вы используете Spark 2.x и SparkSession. Я также импортирую сторонний пакет.

import logging

import pytest
from pyspark.sql import SparkSession

def quiet_py4j():
    """Suppress spark logging for the test context."""
    logger = logging.getLogger('py4j')
    logger.setLevel(logging.WARN)


@pytest.fixture(scope="session")
def spark_session(request):
    """Fixture for creating a spark context."""

    spark = (SparkSession
             .builder
             .master('local[2]')
             .config('spark.jars.packages', 'com.databricks:spark-avro_2.11:3.0.1')
             .appName('pytest-pyspark-local-testing')
             .enableHiveSupport()
             .getOrCreate())
    request.addfinalizer(lambda: spark.stop())

    quiet_py4j()
    return spark


def test_my_app(spark_session):
   ...

Обратите внимание, что если использовать Python 3, мне пришлось указать это как переменную среды PYSPARK_PYTHON:

import os
import sys

IS_PY2 = sys.version_info < (3,)

if not IS_PY2:
    os.environ['PYSPARK_PYTHON'] = 'python3'

В противном случае вы получите сообщение об ошибке:

Исключение: Python у рабочего имеет разную версию 2.7, чем в драйвер 3.5, PySpark не может работать с различными второстепенными версиями. Пожалуйста, проверить переменные окружения PYSPARK_PYTHON и PYSPARK_DRIVER_PYTHON правильно установлены.

Ответ 4

Предполагая, что у вас установлен pyspark, вы можете использовать следующий класс для unitTest в unittest:

import unittest
import pyspark


class PySparkTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        conf = pyspark.SparkConf().setMaster("local[2]").setAppName("testing")
        cls.sc = pyspark.SparkContext(conf=conf)

    @classmethod
    def tearDownClass(cls):
        cls.sc.stop()

Пример:

class SimpleTestCase(PySparkTestCase):

    def test_basic(self):
        test_input = [
            ' hello spark ',
            ' hello again spark spark'
        ]

        input_rdd = self.sc.parallelize(test_input, 1)

        from operator import add

        results = input_rdd.flatMap(lambda x: x.split()).map(lambda x: (x, 1)).reduceByKey(add).collect()
        self.assertEqual(results, [('hello', 2), ('spark', 3), ('again', 1)])

Ответ 5

Когда-то я тоже столкнулся с той же проблемой, и после прочтения нескольких статей, форумов и некоторых ответов StackOverflow я закончил писать небольшой плагин для pytest: pytest-spark

Я уже использую его в течение нескольких месяцев, и общий рабочий процесс выглядит хорошо в Linux:

  • Установить Apache Spark (настроить JVM + распаковать дистрибутив Spark в какой-либо каталог)
  • Установите "pytest" + плагин "pytest-spark"
  • Создайте "pytest.ini" в каталоге проекта и укажите там место Spark.
  • Запустите ваши тесты с помощью pytest, как обычно.
  • При желании вы можете использовать "spark_context" в ваших тестах, которые предоставляются плагином, - он пытается минимизировать журналы Spark на выходе.