Как ускорить модульные тесты Spark SQL?
Я оцениваю Spark SQL для реализации простого модуля отчетности (несколько простых агрегатов по данным Avro, уже сохраненным на HDFS). Я не сомневаюсь, что Spark SQL может хорошо соответствовать как моим функциональным, так и нефункциональным требованиям.
Однако, помимо требований к производству, я хочу убедиться, что модуль будет тестироваться. Мы следуем подходу BDD с очень целенаправленными сценариями, что означает, что для этого модуля потребуется выполнить десятки/сотни SQL-запросов по некоторым очень простым данным (1..10 записи).
Чтобы получить приблизительное представление о производительности, которую я могу ожидать от Spark SQL в локальном режиме, я быстро прототипировал несколько тестов:
-
select count(*) from myTable
-
select key, count(*) from myTable group by key
Первый тест занимает в среднем 100 мс, а второй - 500 мс. Такая производительность неприемлема, что это сделает тестовый набор слишком медленным.
Для сравнения, я могу запустить тот же тест в 10 мс, используя Crunch и его MemPipeline (1500 мс с MRPipeline в локальном режиме), а также 1500 мс с Hive во встроенном режиме. Spark SQL, таким образом, немного быстрее MR в локальном режиме, но все же способ замедлить создание хороших тестовых наборов.
Можно ли ускорить работу Spark SQL в локальном режиме?
Есть ли лучший/более быстрый способ протестировать модуль Spark SQL?
(Я еще не профилировал выполнение, но поскольку a groupBy().countByKey()
на RDD в среднем составляет 40 мс, я ожидаю, что виновником станет оптимизатор запросов)
Ниже приведен мой быстрый и грязный тестовый код:
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName("poc-sparksql");
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
SQLContext sqlCtx = new SQLContext(ctx);
for (int i = 0; i < ITERATIONS; i++) {
Stopwatch testCaseSw = new Stopwatch().start();
DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
df.registerTempTable("myTable");
DataFrame result = sqlCtx.sql("select count(*) from myTable");
System.out.println("Results: " + result.collectAsList());
System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
}
for (int i = 0; i < ITERATIONS; i++) {
Stopwatch testCaseSw = new Stopwatch().start();
DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
df.registerTempTable("myTable");
DataFrame result = sqlCtx.sql("select a, count(*) from myTable group by a ");
System.out.println("Results: " + result.collectAsList());
System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
}
}
Ответы
Ответ 1
Если вы ищете оптимизацию уровня ms, существуют различные указатели.
- Прочитайте свои данные один раз и кеш и просто SQL-запрос через него несколько раз. внутри загрузки цикла означает "создание новой задачи в каждом элементе".
DataFrame df = sqlCtx.load("/tmp/test.avro","com.databricks.spark.avro");
df.registerTempTable("myTable");
df.cache()
for (int i = 0; i < ITERATIONS; i++) {
Stopwatch testCaseSw = new Stopwatch().start();
DataFrame result = sqlCtx.sql("select count(*) from myTable");
// Dont do printLn inside the loop , save the output in some hashMap and print it later once the loop is complete
System.out.println("Results: " + result.collectAsList());
System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
}
- Извлеките System.out.println за пределы цикла, поскольку он потребляет некоторое время.
Пожалуйста, посмотрите:
http://bytepadding.com/big-data/spark/understanding-spark-through-map-reduce/
Ответ 2
Я использую библиотеку spark-testing-base
, разработанную Холденом Карау, для юнит-теста в Spark.
В относительном README.md
вы можете найти дополнительную информацию о настройке ресурсов, выделяемых для юнит-тестов.
Ответ 3
Здесь вы найдете информацию о том, насколько эффективно вы можете управлять разделами, чтобы повысить производительность, а также о наиболее распространенных ошибках, которые мы совершаем, и о том, как их исправить, чтобы повысить скорость. проверьте это здесь