Как переписать выходной каталог в искру
У меня есть приложение для искрообразования, которое создает каждую базу данных.
Мне нужно сохранить/перезаписать результаты обработанных данных.
Когда я попытался перезаписать набор данных, org.apache.hadoop.mapred.FileAlreadyExistsException останавливает выполнение.
Я установил свойство Spark set("spark.files.overwrite","true")
, но вам не повезло.
Как перезаписать или предварительно настроить файлы из искры?
Ответы
Ответ 1
ОБНОВЛЕНИЕ: Предложить использование Dataframes
, плюс что-то вроде ....write.mode(SaveMode.Overwrite)...
Для старых версий попробуйте
yourSparkConf.set("spark.hadoop.validateOutputSpecs", "false")
val sc = SparkContext(yourSparkConf)
В версии 1.1.0 вы можете установить параметры конфигурации, используя скрипт spark-submit с флагом --conf.
ПРЕДУПРЕЖДЕНИЕ (более старые версии): Согласно @piggybox, в Spark есть ошибка, из-за которой он будет перезаписывать только те файлы, которые ему необходимы для записи его part-
файлов, остальные файлы будут part-
.
Ответ 2
поскольку df.save(path, source, mode)
устарел, (http://spark.apache.org/docs/1.5.0/api/scala/index.html#org.apache.spark.sql.DataFrame)
использовать df.write.format(source).mode("overwrite").save(path)
где df.write - DataFrameWriter
'source' может быть ( "com.databricks.spark.avro" | "паркет" | "json" )
Ответ 3
Документация для параметра spark.files.overwrite
говорит следующее: "Записывать ли файлы, добавленные через SparkContext.addFile()
, когда целевой файл существует, а его содержимое не совпадает с содержимым источника". Таким образом, это не влияет на метод saveAsTextFiles.
Вы можете сделать это до сохранения файла:
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI("hdfs://localhost:9000"), hadoopConf)
try { hdfs.delete(new org.apache.hadoop.fs.Path(filepath), true) } catch { case _ : Throwable => { } }
Аас объясняется здесь:
http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-make-Spark-1-0-saveAsTextFile-to-overwrite-existing-file-td6696.html
Ответ 4
В документации pyspark.sql.DataFrame.save (в настоящее время в 1.3.1) вы можете указать mode='overwrite'
при сохранении DataFrame:
myDataFrame.save(path='myPath', source='parquet', mode='overwrite')
Я проверил, что это даже удалит файлы с файлами разделов. Итак, если вы сказали изначально 10 разделов/файлов, но затем перезаписали папку с DataFrame, в которой было только 6 разделов, результирующая папка будет иметь 6 разделов/файлов.
Дополнительную информацию о параметрах режима см. в документации Spark SQL.
Ответ 5
df.write.mode('overwrite').parquet("/output/folder/path")
работает, если вы хотите перезаписать файл паркета, используя python. Это в искре 1.6.2. API может отличаться в более поздних версиях
Ответ 6
val jobName = "WordCount";
//overwrite the output directory in spark set("spark.hadoop.validateOutputSpecs", "false")
val conf = new
SparkConf().setAppName(jobName).set("spark.hadoop.validateOutputSpecs", "false");
val sc = new SparkContext(conf)
Ответ 7
Эта перегруженная версия функции Сохранить работает для меня:
yourDF.save(outputPath, org.apache.spark.sql.SaveMode.valueOf( "Перезаписать" ))
Приведенный выше пример заменит существующую папку. Сабемод также может принимать эти параметры (https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/SaveMode.html):
Добавить. Режим добавления означает, что при сохранении DataFrame в источнике данных, если данные/таблица уже существует, ожидается, что содержимое DataFrame будет добавлено к существующим данным.
ErrorIfExists: режим ErrorIfExists означает, что при сохранении DataFrame в источнике данных, если данные уже существуют, ожидается, что будет выбрано исключение.
Игнорировать. Режим игнорирования означает, что при сохранении DataFrame в источнике данных, если данные уже существуют, ожидается, что операция сохранения не сохранит содержимое DataFrame и не изменит существующие данные.
Ответ 8
Если вы хотите использовать свой собственный формат вывода, вы также сможете получить желаемое поведение с помощью RDD.
Посмотрите на следующие классы:
FileOutputFormat,
FileOutputCommitter
В формате вывода файла у вас есть метод с именем checkOutputSpecs, который проверяет, существует ли выходной каталог.
В FileOutputCommitter у вас есть commitJob, который обычно переносит данные из временного каталога в его конечное место.
Я еще не смог его проверить (сделаю это, как только у меня будет несколько бесплатных минут), но теоретически: если я расширяю FileOutputFormat и переопределяю checkOutputSpecs методу, который не создает исключение из каталога, уже существует, и отредактируйте метод commitJob моего настраиваемого коммиттера вывода, чтобы выполнить ту логику, которую я хочу (например, переопределить некоторые файлы, добавить другие), чем я могу также добиться желаемого поведения с помощью RDD.
Формат вывода передается: saveAsNewAPIHadoopFile (который является методом saveAsTextFile, а также фактически сохраняет файлы). И коммиттер вывода настроен на уровне приложения.