Pyspark использует одну задачу для mapPartitions при преобразовании rdd в dataframe

Я смущен тем, почему кажется, что Spark использует 1 задачу для rdd.mapPartitions при преобразовании полученного RDD в DataFrame.

Это проблема для меня, потому что я хотел бы перейти от:

DataFrameRDDrdd.mapPartitionsDataFrame

чтобы я мог читать данные (DataFrame), применять не-SQL-функцию к кускам данных (mapPartitions on RDD), а затем преобразовывать обратно в DataFrame, чтобы я мог использовать процесс DataFrame.write.

Я могу перейти из DataFrame → mapPartitions, а затем использовать RDD-writer, такой как saveAsTextFile, но это не так идеально, так как процесс DataFrame.write может делать такие вещи, как перезапись и сохранение данных в формате Orc. Поэтому я хотел бы узнать, почему это происходит, но с практической точки зрения я в первую очередь забочусь о том, чтобы просто перейти от DataFrame → mapParitions → к использованию процесса DataFrame.write.

Вот пример воспроизводимости. Следующее работает, как ожидалось, с 100 задачами для работы mapPartitions:

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession \
    .builder \
    .master("yarn-client") \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

df = pd.DataFrame({'var1':range(100000),'var2': [x-1000 for x in range(100000)]})
spark_df = spark.createDataFrame(df).repartition(100)

def f(part):
    return [(1,2)]

spark_df.rdd.mapPartitions(f).collect()

Однако, если последняя строка изменяется на что-то вроде spark_df.rdd.mapPartitions(f).toDF().show(), тогда для работы mapPartitions будет только одна задача.

Некоторые скриншоты, иллюстрирующие это ниже: введите описание изображения здесь введите описание изображения здесь

Ответы

Ответ 1

DataFrame.show() показывает только первое число строк вашего фрейма данных, по умолчанию только первое 20. Если это число меньше количества строк на раздел, Spark ленив и оценивает только один раздел, что эквивалентно к одной задаче.

Вы также можете сделать collect на фрейме данных, чтобы вычислить и собрать все разделы и снова увидеть 100 задач.

Вы по-прежнему увидите задачу runJob сначала, как и прежде, что вызвано вызовом toDF, чтобы иметь возможность определить результирующую схему данных: необходимо обработать один раздел, чтобы иметь возможность определять типы вывода вашей функции отображения. После этого начального этапа фактическое действие, такое как collect, произойдет во всех партитонах. Например, для меня ваш фрагмент с последней заменой строки spark_df.rdd.mapPartitions(f).toDF().collect() приводит к следующим этапам:

введите описание изображения здесь