Искра: запись DataFrame как сжатого JSON
Apache Spark DataFrameReader.json()
может обрабатывать файлы gzipped JSONlines автоматически, но, похоже, нет способа получить DataFrameWriter.json()
для записи сжатых файлов JSONlines. Дополнительный сетевой ввод-вывод очень дорог в облаке.
Есть ли способ решить эту проблему?
Ответы
Ответ 1
В следующих решениях используется pyspark, но я предполагаю, что код в Scala будет похож.
Первый вариант - установить следующее, когда вы инициализируете SparkConf:
conf = SparkConf()
conf.set("spark.hadoop.mapred.output.compress", "true")
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
С кодом выше любого файла, который вы производите, используя этот sparkContext, автоматически сжимается с помощью gzip.
Второй вариант, если вы хотите сжать только выбранные файлы в вашем контексте. Допустим, что "df" - это ваш фрейм данных и имя файла для вашего назначения:
df_rdd = self.df.toJSON()
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
Ответ 2
С Spark 2.X(и, возможно, раньше я не тестировал) существует более простой способ записи сжатого JSON, который не требует изменения конфигурации:
val df: DataFrame = ...
df.write.option("compression", "gzip").json("/foo/bar")
Это также работает для CSV и для Parquet, просто используйте .csv() и .parquet() вместо .json(), чтобы записать файл после установки опции сжатия.
Возможные кодеки: none, bzip2, deflate, gzip, lz4 и snappy.
Ответ 3
Установка параметров сжатия в SparkConf
НЕ является хорошей практикой, как принятый ответ. Это изменило поведение глобально вместо того, чтобы указывать настройки для каждого файла отдельно. Правда в том, что явное всегда лучше, чем неявное. Есть также некоторые случаи, когда пользователи не могут легко манипулировать конфигурацией контекста, например spark-shell или в кодах, разработанных как подмодуль другого.
Правильный путь
Написание DataFrame
со сжатием поддерживается начиная с Spark 1.4. Несколько способов достичь этого:
Один
df.write.json("filename.json", compression="gzip")
Это! Просто используйте DataFrameWriter.json()
как вы хотите.
Волшебство скрыто в коде pyspark/sql/readwriter.py
@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
"""Saves the content of the :class:'DataFrame' in JSON format
('JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>'_) at the
specified path.
:param path: the path in any Hadoop supported file system
:param mode: ...
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
:param dateFormat: ...
:param timestampFormat: ...
>>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
"""
self.mode(mode)
self._set_opts(
compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
self._jwrite.json(path)
Поддерживаемые форматы сжатия: bzip2, gzip, lz4, snappy и deflate, без учета регистра.
Scala API должен быть таким же.
Другая
df.write.options(compression="gzip").json("filename.json")
Подобно тому, как указано выше. в качестве аргументов ключевых слов можно указать больше параметров. доступно начиная с Spark 1.4.
В третьих
df.write.option("compression", "gzip").json("filename.json")
DataFrameWriter.option()
добавлен начиная с DataFrameWriter.option()
Spark 1.5. Только один параметр может быть добавлен за один раз.
Ответ 4
Очень хороший ответ @ttimasdf. Однако опция # 1 не работает при развертывании HortonWorks в версии 1.6.3. Варианты № 2 и № 3 работают хорошо.