Как работает функция pyspark mapPartitions?
Итак, я пытаюсь изучить Spark с помощью Python (Pyspark). Я хочу знать, как работает функция mapPartitions
. Это то, что вводит он и какой результат он дает. Я не мог найти подходящего примера из Интернета. Допустим, у меня есть объект RDD, содержащий списки, например ниже.
[ [1, 2, 3], [3, 2, 4], [5, 2, 7] ]
И я хочу удалить элемент 2 из всех списков, как бы достичь этого, используя mapPartitions
.
Ответы
Ответ 1
mapPartition следует рассматривать как операцию отображения над разделами, а не над элементами раздела. Входные данные - это набор текущих разделов, а выходные данные - другой набор разделов.
Функция, которую вы передаете карте, должна принимать отдельный элемент вашего RDD
Функция, которую вы передаете mapPartition, должна принимать итерацию вашего RDD-типа и возвращать и повторяемую какого-либо другого или того же типа.
В вашем случае вы, вероятно, просто хотите сделать что-то вроде
def filter_out_2(line):
return [x for x in line if x != 2]
filtered_lists = data.map(filterOut2)
если вы хотите использовать mapPartition, это будет
def filter_out_2_from_partition(list_of_lists):
final_iterator = []
for sub_list in list_of_lists:
final_iterator.append( [x for x in sub_list if x != 2])
return iter(final_iterator)
filtered_lists = data.mapPartition(filterOut2FromPartion)
Ответ 2
Проще использовать mapPartitions с функцией генератора, используя синтаксис yield
:
def filter_out_2(partition):
for element in partition:
if element != 2:
yield element
filtered_lists = data.mapPartitions(filter_out_2)
Ответ 3
Нужна последняя версия
def filter_out_2(partition):
for element in partition:
sec_iterator = []
for i in element:
if i!= 2:
sec_iterator.append(i)
yield sec_iterator
filtered_lists = data.mapPartitions(filter_out_2)
for i in filtered_lists.collect(): print(i)