Как сделать хорошие воспроизводимые примеры Apache Spark
Я потратил немало времени на чтение некоторых вопросов с pyspark и spark-dataframe, и очень часто я нахожу, что плакаты не предоставляют достаточной информации, чтобы действительно понять их вопрос. Обычно я комментирую просить их опубликовать MCVE, но иногда получение их для отображения некоторых данных ввода/вывода образца похоже на вытягивание зубов. Например: см. Комментарии к этому вопросу.
Возможно, часть проблемы заключается в том, что люди просто не знают, как легко создавать MCVE для блоков с искробезопасными данными. Я думаю, что было бы полезно иметь версию этого pandas вопроса об искровых кадрах > в качестве руководства, которое может быть связано.
Итак, как же создать хороший, воспроизводимый пример?
Ответы
Ответ 1
Предоставьте небольшие данные образцов, которые можно легко воссоздать.
По крайней мере, плакаты должны содержать пару строк и столбцов на их фрейме и коде, которые могут быть использованы для его простого создания. Легко, я имею в виду вырезать и вставить. Сделайте это как можно меньше, чтобы продемонстрировать свою проблему.
У меня есть следующий фреймворк:
+-----+---+-----+----------+
|index| X|label| date|
+-----+---+-----+----------+
| 1| 1| A|2017-01-01|
| 2| 3| B|2017-01-02|
| 3| 5| A|2017-01-03|
| 4| 7| B|2017-01-04|
+-----+---+-----+----------+
который может быть создан с помощью этого кода:
df = sqlCtx.createDataFrame(
[
(1, 1, 'A', '2017-01-01'),
(2, 3, 'B', '2017-01-02'),
(3, 5, 'A', '2017-01-03'),
(4, 7, 'B', '2017-01-04')
],
('index', 'X', 'label', 'date')
)
Покажите желаемый результат.
Задайте свой конкретный вопрос и покажите нам желаемый результат.
Как создать новый столбец 'is_divisible'
, который имеет значение 'yes'
, если день месяца 'date'
плюс 7 дней делится на значение в столбце 'X'
и 'no'
в противном случае?
Требуемый вывод:
+-----+---+-----+----------+------------+
|index| X|label| date|is_divisible|
+-----+---+-----+----------+------------+
| 1| 1| A|2017-01-01| yes|
| 2| 3| B|2017-01-02| yes|
| 3| 5| A|2017-01-03| yes|
| 4| 7| B|2017-01-04| no|
+-----+---+-----+----------+------------+
Объясните, как получить свой результат.
Объясните, в деталях, как вы получите желаемый результат. Это помогает показать пример расчета.
Например, в строке 1, X = 1 и date = 2017-01-01. Добавление 7 дней до даты выхода 2017-01-08. День месяца равен 8, и поскольку 8 делится на 1, ответ "да".
Аналогично, для последней строки X = 7 и даты = 2017-01-04. Добавление 7 к дате дает 11 как день месяца. Поскольку 11% 7 не равно 0, ответ "нет".
Поделитесь существующим кодом.
Покажите нам, что вы сделали или попробовали, включая все * кода, даже если он не работает. Сообщите нам, где вы застреваете, и если вы получили сообщение об ошибке, укажите сообщение об ошибке.
(* Вы можете оставить код для создания контекста искры, но вы должны включить весь импорт.)
Я знаю, как добавить новый столбец date
плюс 7 дней, но мне не удается получить день месяца как целое число.
from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))
Включить версии, импортировать и использовать подсветку синтаксиса
Для сообщений о настройке производительности включите план выполнения
- Подробнее в этом ответе, написанном user8371915.
- Он помогает использовать стандартизованные имена для контекстов.
Разбор исходных файлов с искровым выходом
- MaxU предоставил полезный код в этом ответе, чтобы помочь разобрать выходные файлы Spark в DataFrame.
Другие примечания.
Ответ 2
Настройка производительности
Если вопрос связан с настройкой производительности, пожалуйста, укажите следующую информацию.
План выполнения
Лучше всего включить расширенный план выполнения. В Python:
df.explain(True)
В Scala:
df.explain(true)
или расширенный план выполнения со статистикой. В Python:
print(df._jdf.queryExecution().stringWithStats())
в Scala:
df.queryExecution.stringWithStats
Информация о режиме и кластере
-
mode
- local
, client
, `cluster.
- Диспетчер кластеров (если применимо) - нет (локальный режим), автономный, YARN, Mesos, Kubernetes.
- Основная информация о конфигурации (количество ядер, память исполнителей).
Информация о времени
slow относительный, особенно когда вы переносите нераспространенное приложение или вы ожидаете низкой задержки. Точные тайминги для разных задач и этапов могут быть получены из пользовательского интерфейса Spark (sc.uiWebUrl
) jobs
или Spark REST UI.
Использовать стандартизованные имена для контекстов
Использование установленных имен для каждого контекста позволяет нам быстро воспроизвести проблему.
-
sc
- для SparkContext
.
-
sqlContext
- для sqlContext
.
-
spark
- для SparkSession
.
Предоставить информацию о типе (Scala)
Мощный вывод типа - одна из наиболее полезных функций Scala, но трудно анализировать код, полученный из контекста. Даже если тип является очевидным из контекста, лучше комментировать переменные. Предпочитают
val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))
над
val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))
Обычно используемые инструменты могут помочь вам:
Ответ 3
Хороший вопрос и ответ; некоторые дополнительные предложения:
Включите версию Spark
Искра все еще развивается, хотя и не так быстро, как в дни 1.x. Это всегда (но особенно, если вы используете несколько более старую версию), хорошая идея включить вашу рабочую версию. Лично я всегда начинаю свои ответы с:
spark.version
# u'2.2.0'
или
sc.version
# u'2.2.0'
Включение вашей версии Python тоже никогда не будет плохой идеей.
Включить все ваши импортные товары
Если ваш вопрос не строго касается Spark SQL и dataframes, например. если вы намерены использовать свою фреймворк в некоторых машинных процессах обучения, будьте откровенны в отношении импорта - см. этот вопрос, где импорт был добавлен в OP только после широкого обмена в (теперь удалены) комментарии (и оказалось, что эти неправильные импорты были основной причиной проблемы).
Почему это необходимо? Поскольку, например, этот LDA
from pyspark.mllib.clustering import LDA
отличается от этого LDA:
from pyspark.ml.clustering import LDA
первый из старого API на основе RDD (ранее Spark MLlib), а второй - из нового API на основе данных на основе данных (Spark ML).
Включить выделение кода
Хорошо, я признаю, что это субъективно: я считаю, что вопросы PySpark не должны помечены как python
по умолчанию; дело в том, что тег python
дает автоматическое выделение кода (и я считаю, что это основная причина для тех, кто использует его для вопросов PySpark). В любом случае, если вы согласитесь, и вам все равно нужен хороший, выделенный код, просто включите соответствующую директиву уценки:
<!-- language-all: lang-python -->
где-то в вашем посте до вашего первого фрагмента кода.
[UPDATE: Я запросил автоматическую подсветку синтаксиса для тегов pyspark
и sparkr
- приветствует большинство приветствий]
Ответ 4
Эта небольшая вспомогательная функция может помочь проанализировать выходные файлы Spark в DataFrame:
PySpark:
from pyspark.sql.functions import *
def read_spark_output(file_path):
step1 = spark.read \
.option("header","true") \
.option("inferSchema","true") \
.option("delimiter","|") \
.option("parserLib","UNIVOCITY") \
.option("ignoreLeadingWhiteSpace","true") \
.option("ignoreTrailingWhiteSpace","true") \
.option("comment","+") \
.csv("file://{}".format(file_path))
# select not-null columns
step2 = t.select([c for c in t.columns if not c.startswith("_")])
# deal with 'null' string in column
return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])
Scala:
// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
val step1 = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "|")
.option("parserLib", "UNIVOCITY")
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.option("comment", "+")
.csv(filePath)
val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)
val columns = step2.columns
columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}
Использование:
df = read_spark_output("file:///tmp/spark.out")
PS: для pyspark, eqNullSafe
доступен от spark 2.3
.