Просмотр содержимого RDD в Python Spark?
Запуск простого приложения в pyspark.
f = sc.textFile("README.md")
wc = f.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add)
Я хочу просмотреть содержимое RDD с помощью действия foreach:
wc.foreach(print)
Это вызывает синтаксическую ошибку:
SyntaxError: invalid syntax
Что мне не хватает?
Ответы
Ответ 1
Эта ошибка связана с тем, что print
не является функцией в Python 2.6.
Вы можете определить вспомогательный UDF, который выполняет печать, или использовать библиотеку __future__ для обработки print
как функции:
>>> from operator import add
>>> f = sc.textFile("README.md")
>>> def g(x):
... print x
...
>>> wc.foreach(g)
или
>>> from __future__ import print_function
>>> wc.foreach(print)
Однако, я думаю, было бы лучше использовать collect()
, чтобы вернуть содержимое RDD в драйвер, потому что foreach
выполняется на рабочих узлах, и выходы могут не обязательно появляться в вашем драйвере/оболочке (возможно, это будет в режиме local
, но не при работе в кластере).
>>> for x in wc.collect():
... print x
Ответ 2
В Spark 2.0 (я не тестировал более ранние версии). Просто:
print myRDD.take(n)
Где n - количество строк, а myRDD - wc в вашем случае.
Ответ 3
Если вы хотите увидеть содержимое RDD, тогда yes collect - это один параметр, но он извлекает все данные в драйвер, поэтому может возникнуть проблема
<rdd.name>.take(<num of elements you want to fetch>)
Лучше, если вы хотите увидеть только образец
Запуск foreach и попытка печати, я не рекомендую это, потому что если вы используете это в кластере, тогда журналы печати будут локальными для исполнителя и будут печатать для данных, доступных этому исполнителю, Оператор print не меняет состояние, поэтому он не является логически неправильным. Чтобы получить все журналы, вам нужно будет сделать что-то вроде
**Pseudocode**
collect
foreach print
Но это может привести к поломке задания, так как сбор всех данных о драйвере может привести к его сбою. Я бы предложил использовать команду принять, или если вы хотите ее проанализировать, используйте образец для сбора на драйвере или записи в файл, а затем проанализируйте его.
Ответ 4
Попробуйте следующее:
data = f.flatMap(lambda x: x.split(' '))
map = data.map(lambda x: (x, 1))
mapreduce = map.reduceByKey(lambda x,y: x+y)
result = mapreduce.collect()
Обратите внимание, что при запуске collect() RDD - это распределенный набор данных, агрегируется в драйвере node и по существу преобразуется в список. Таким образом, очевидно, что собирать() набор данных 2T не рекомендуется. Если вам нужно всего несколько образцов из вашего RDD, используйте take (10).
Ответ 5
В последнем документе вы можете использовать rdd.collect(). foreach (println) в драйвере для отображения всех, но это может вызвать проблемы с памятью в драйвере, лучше всего использовать rdd.take(wish_number)
https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html
Чтобы напечатать все элементы в драйвере, можно использовать метод collect(), чтобы сначала привести RDD к драйверу node таким образом: rdd.collect(). foreach (println). Это может привести к тому, что драйвер исчерпает память, потому что collect() извлекает весь RDD на одну машину; если вам нужно всего лишь напечатать несколько элементов RDD, более безопасный подход - использовать метод take(): rdd.take(100).foreach(println).
Ответ 6
Вы можете просто собрать весь RDD (который вернет список) и распечатать указанный список:
print(wc.collect)