Ошибка порога PySpark Метод __getnewargs __ ([]) не существует
У меня есть набор файлов. Путь к файлам сохраняется в файле. Скажем, "all_files.txt" . Используя искру apache, мне нужно выполнить операцию над всеми файлами и собрать результаты.
Шаги, которые я хочу сделать, следующие:
- Создайте RDD, прочитав "all_files.txt"
- Для каждой строки в "all_files.txt" (каждая строка - это путь к некоторому файлу)
прочитайте содержимое каждого из файлов в один RDD
- Затем выполните операцию со всем содержимым
Это код, который я написал для него:
def return_contents_from_file (file_name):
return spark.read.text(file_name).rdd.map(lambda r: r[0])
def run_spark():
file_name = 'path_to_file'
spark = SparkSession \
.builder \
.appName("PythonWordCount") \
.getOrCreate()
counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
.flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
.flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files
Это вызывает ошибку:
строка 323, в get_return_value py4j.protocol.Py4JError: ошибка произошло при вызове o25. getnewargs. След: py4j.Py4JException: Метод getnewargs ([]) не существует при py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) в py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) на py4j.Gateway.invoke(Gateway.java:272) в py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) на py4j.commands.CallCommand.execute(CallCommand.java:79) в py4j.GatewayConnection.run(GatewayConnection.java:214) в java.lang.Thread.run(Thread.java:745)
Может кто-нибудь, пожалуйста, скажите мне, что я делаю неправильно, и как я должен идти дальше. Спасибо заранее.
Ответы
Ответ 1
Использование spark
внутри flatMap
или любое преобразование, которое происходит у исполнителей, недопустимо (spark
session доступен только для драйвера). Также невозможно создать RDD из RDD (см. Возможно ли создание вложенных RDD в Apache Spark?)
Но вы можете добиться этого преобразования по-другому - прочитайте все содержимое all_files.txt
в dataframe, используйте local map
, чтобы сделать их dataframes и local reduce
для объединения всех, см. пример:
>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)
Ответ 2
Я столкнулся с этой проблемой сегодня, наконец, выяснил, что я ссылался на объект spark.DataFrame
в pandas_udf
, что приводит к этой ошибке.
Заключение:
Вы не можете использовать объект sparkSession
, объект spark.DataFrame
или другие распределенные объекты Spark в udf
и pandas_udf
, потому что они не выбраны.
Если вы встречаете эту ошибку и используете udf
, проверьте ее внимательно, должна быть относительная проблема.