Pyspark использует одну задачу для mapPartitions при преобразовании rdd в dataframe
Я смущен тем, почему кажется, что Spark использует 1 задачу для rdd.mapPartitions
при преобразовании полученного RDD в DataFrame.
Это проблема для меня, потому что я хотел бы перейти от:
DataFrame
→ RDD
→ rdd.mapPartitions
→ DataFrame
чтобы я мог читать данные (DataFrame), применять не-SQL-функцию к кускам данных (mapPartitions on RDD), а затем преобразовывать обратно в DataFrame, чтобы я мог использовать процесс DataFrame.write
.
Я могу перейти из DataFrame → mapPartitions, а затем использовать RDD-writer, такой как saveAsTextFile, но это не так идеально, так как процесс DataFrame.write
может делать такие вещи, как перезапись и сохранение данных в формате Orc. Поэтому я хотел бы узнать, почему это происходит, но с практической точки зрения я в первую очередь забочусь о том, чтобы просто перейти от DataFrame → mapParitions → к использованию процесса DataFrame.write.
Вот пример воспроизводимости. Следующее работает, как ожидалось, с 100 задачами для работы mapPartitions
:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession \
.builder \
.master("yarn-client") \
.enableHiveSupport() \
.getOrCreate()
sc = spark.sparkContext
df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)
def f(part):
return [(1,2)]
spark_df.rdd.mapPartitions(f).collect()
Однако, если последняя строка изменяется на что-то вроде spark_df.rdd.mapPartitions(f).toDF().show()
, тогда для работы mapPartitions
будет только одна задача.
Некоторые скриншоты, иллюстрирующие это ниже:
![введите описание изображения здесь]()
Ответы
Ответ 1
DataFrame.show()
показывает только первое число строк вашего фрейма данных, по умолчанию только первое 20. Если это число меньше количества строк на раздел, Spark ленив и оценивает только один раздел, что эквивалентно к одной задаче.
Вы также можете сделать collect
на фрейме данных, чтобы вычислить и собрать все разделы и снова увидеть 100 задач.
Вы по-прежнему увидите задачу runJob
сначала, как и прежде, что вызвано вызовом toDF
, чтобы иметь возможность определить результирующую схему данных: необходимо обработать один раздел, чтобы иметь возможность определять типы вывода вашей функции отображения. После этого начального этапа фактическое действие, такое как collect
, произойдет во всех партитонах. Например, для меня ваш фрагмент с последней заменой строки spark_df.rdd.mapPartitions(f).toDF().collect()
приводит к следующим этапам:
![введите описание изображения здесь]()