Функции искрообразования и производительность UDF?
Spark теперь предлагает предопределенные функции, которые могут использоваться в информационных фреймах, и кажется, что они высоко оптимизированы. Мой первоначальный вопрос был о том, что будет быстрее, но я провел некоторое тестирование и обнаружил, что функции зажигания примерно в 10 раз быстрее, по крайней мере, в одном случае. Кто-нибудь знает, почему это так, и когда udf будет быстрее (только для случаев, когда существует идентичная функция искры)?
Вот мой тестовый код (запущенный в сообществе Databricks ed):
# UDF vs Spark function
from faker import Factory
from pyspark.sql.functions import lit, concat
fake = Factory.create()
fake.seed(4321)
# Each entry consists of last_name, first_name, ssn, job, and age (at least 1)
from pyspark.sql import Row
def fake_entry():
name = fake.name().split()
return (name[1], name[0], fake.ssn(), fake.job(), abs(2016 - fake.date_time().year) + 1)
# Create a helper function to call a function repeatedly
def repeat(times, func, *args, **kwargs):
for _ in xrange(times):
yield func(*args, **kwargs)
data = list(repeat(500000, fake_entry))
print len(data)
data[0]
dataDF = sqlContext.createDataFrame(data, ('last_name', 'first_name', 'ssn', 'occupation', 'age'))
dataDF.cache()
Функция UDF:
concat_s = udf(lambda s: s+ 's')
udfData = dataDF.select(concat_s(dataDF.first_name).alias('name'))
udfData.count()
Функция искры:
spfData = dataDF.select(concat(dataDF.first_name, lit('s')).alias('name'))
spfData.count()
Запускался несколько раз, udf обычно занимал около 1,1–1,4 с, а concat
функция Spark всегда выполнялась менее 0,15 с.
Ответы
Ответ 1
когда UDD будет быстрее
Если вы спросите о Python UDF, ответ, вероятно, никогда не будет *. Поскольку функции SQL относительно просты и не предназначены для сложных задач, практически невозможно компенсировать стоимость повторной сериализации, десериализации и перемещения данных между интерпретатором Python и JVM.
Кто-нибудь знает, почему это так
Основные причины уже перечислены выше и могут быть сведены к простому факту, что Spark DataFrame
изначально является структурой JVM, а стандартные методы доступа реализуются простыми вызовами Java API. UDF, с другой стороны, реализованы на Python и требуют перемещения данных вперед и назад.
Хотя PySpark в целом требует перемещения данных между JVM и Python, в случае низкоуровневого API RDD он обычно не требует дорогостоящей активности serde. Spark SQL добавляет дополнительную стоимость сериализации и сериализации, а также стоимость перемещения данных из небезопасного представления в JVM и обратно. Последний относится ко всем UDF (Python, Scala и Java), но первый относится к не родным языкам.
В отличие от UDF, функции Spark SQL работают непосредственно в JVM и обычно хорошо интегрированы как с Catalyst, так и с Tungsten. Это означает, что они могут быть оптимизированы в плане выполнения, и большую часть времени могут получить выгоду от оптимизации codgen и других вольфрамовых соединений. Более того, они могут работать с данными в "родном" представлении.
Таким образом, в некотором смысле проблема заключается в том, что Python UDF должен переносить данные в код, в то время как выражения SQL идут наоборот.
* По приблизительным оценкам UDF окна PySpark может превзойти оконную функцию Scala.
Ответ 2
Спустя годы, когда у меня появилось больше знаний и я снова взглянул на этот вопрос, я просто понял, что @alfredox действительно хочет спросить. Поэтому я снова пересмотрел и разделил ответ на две части:
Чтобы ответить, почему собственная функция DF (собственная функция Spark-SQL) работает быстрее:
По сути, почему встроенная функция Spark ВСЕГДА быстрее, чем Spark UDF, независимо от того, реализована ли ваша UDF в Python или Scala.
Во-первых, нам нужно понять, что такое вольфрам, который впервые появился в Spark 1.4.
Это бэкэнд и на чем он сосредоточен:
-
Управление памятью вне кучи с использованием двоичного представления данных в памяти, известного как формат строки Tungsten, и явное управление памятью,
- Локальность кэша, касающаяся вычислений с учетом кеша с разметкой с учетом кеша для высокой частоты обращений к кешу,
- Генерация кода для всего этапа (он же CodeGen).
Одним из крупнейших убийц производительности Spark является GC. GC приостановит все потоки в JVM, пока GC не завершит работу. Именно поэтому внедряется управление памятью вне кучи.
При выполнении собственных функций Spark-SQL данные остаются в вольфрамовом бэкенде. Однако в сценарии Spark UDF данные будут перемещены из вольфрама в JVM (сценарий Scala) или JVM и Python Process (Python) для выполнения фактического процесса, а затем вернутся обратно в вольфрам. В результате этого:
- Неизбежно, будут накладные расходы/штраф на:
- Десериализовать ввод из вольфрама.
- Сериализуйте вывод обратно в вольфрам.
- Даже при использовании Scala, первоклассного гражданина Spark, это увеличит объем памяти в JVM, что может привести к увеличению GC в JVM.
Эта проблема в точности связана с тем, что вольфрамовая функция "Управление памятью вне кучи" пытается решить.
Чтобы ответить, будет ли Python медленнее, чем Scala:
С 30 октября 2017 года компания Spark представила векторизованные файлы udf для pyspark.
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
Причина, по которой Python UDF работает медленно, заключается в том, что PySpark UDF реализован не самым оптимальным образом:
Согласно абзацу из ссылки.
Spark добавил Python API в версии 0.7 с поддержкой пользовательских функций. Эти пользовательские функции работают по одной строке за раз и, следовательно, страдают от высоких затрат на сериализацию и вызовы.
Однако новые векторизованные файлы udf, похоже, значительно улучшают производительность:
от 3х до 100х.
![enter image description here]()