Как импортировать несколько файлов csv в одной загрузке?
У меня есть определенная схема для загрузки 10 csv файлов в папку. Есть ли способ автоматически загружать таблицы, используя Spark SQL. Я знаю, что это может быть выполнено с использованием отдельного фрейма данных для каждого файла [приведенного ниже], но может ли он быть автоматизирован с помощью одной команды, а не указывать файл, я могу указать папку?
df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("../Downloads/2008.csv")
Ответы
Ответ 1
Используйте подстановочный знак, например. замените 2008
на *
:
df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("../Downloads/*.csv") // <-- note the star (*)
Spark 2.0
// these lines are equivalent in Spark 2.0
spark.read.format("csv").option("header", "true").load("../Downloads/*.csv")
spark.read.option("header", "true").csv("../Downloads/*.csv")
Примечания:
-
Замените format("com.databricks.spark.csv")
, используя вместо этого метод format("csv")
или csv
. Формат com.databricks.spark.csv
был интегрирован в 2.0.
-
Используйте spark
not sqlContext
Ответ 2
Дайджест Reader: (Spark 2.x)
Например, если у вас есть 3 каталога, в которых хранятся файлы csv:
dir1, dir2, dir3
Затем вы определяете пути как строку списка путей с разделителями-запятыми следующим образом:
пути= "dir1/, dir2/, dir3/*"
Затем используйте следующую функцию и передайте ей эту переменную path
def get_df_from_csv_paths(paths):
df = spark.read.format("csv").option("header", "false").\
schema(custom_schema).\
option('delimiter', '\t').\
option('mode', 'DROPMALFORMED').\
load(paths.split(','))
return df
Затем:
df = get_df_from_csv_paths(paths)
Вы получите в df единую световую фреймворк, содержащий данные из всех csvs, найденных в этих 3 каталогах.
============================================ ===============================
Полная версия:
Если вы хотите использовать несколько CSV файлов из нескольких каталогов, вам просто нужно передать список и использовать подстановочные знаки.
Пример:
если ваш путь data_path выглядит следующим образом:
's3://bucket_name/subbucket_name/2016-09 - */184/*,
s3://bucket_name/subbucket_name/2016-10 - */184/*,
s3://bucket_name/subbucket_name/2016-11 - */184/*,
s3://bucket_name/subbucket_name/2016-12 - */184/*,... '
вы можете использовать указанную выше функцию для одновременного глотания всех csvs во всех этих каталогах и подкаталогах:
Это запустит все каталоги в s3 bucket_name/subbucket_name/в соответствии с указанными шаблонами подстановок. например первый шаблон будет выглядеть в
bucket_name/subbucket_name/
для всех каталогов с именами, начинающимися с
2016-09 -
и для каждого из них берется только каталог с именем
184
и внутри этого подкаталога искать все файлы csv.
И это будет выполнено для каждого из шаблонов в списке с разделителями-запятыми.
Это работает лучше, чем объединение.
Ответ 3
Обратите внимание, что вы можете использовать другие трюки, например:
-- One or more wildcard:
.../Downloads20*/*.csv
-- braces and brackets
.../Downloads201[1-5]/book.csv
.../Downloads201{11,15,19,99}/book.csv
Ответ 4
Пример 1:
Чтение одного файла CSV. Укажите полный путь к файлу:
val df = spark.read.option("header", "true").csv("C:spark\\sample_data\\tmp\\cars1.csv")
Пример 2:
Чтение нескольких файлов CSV с передачей имён:
val df=spark.read.option("header","true").csv("C:spark\\sample_data\\tmp\\cars1.csv", "C:spark\\sample_data\\tmp\\cars2.csv")
Пример 3:
Чтение нескольких файлов CSV с передачей списка имен:
val paths = List("C:spark\\sample_data\\tmp\\cars1.csv", "C:spark\\sample_data\\tmp\\cars2.csv")
val df = spark.read.option("header", "true").csv(paths: _*)
Пример 4:
Чтение нескольких файлов CSV в папке, игнорирование других файлов:
val df = spark.read.option("header", "true").csv("C:spark\\sample_data\\tmp\\*.csv")
Пример 5:
Чтение нескольких файлов CSV из нескольких папок:
val folders = List("C:spark\\sample_data\\tmp", "C:spark\\sample_data\\tmp1")
val df = spark.read.option("header", "true").csv(folders: _*)
Ответ 5
Используя Spark 2.0+, мы можем загружать несколько файлов CSV из разных каталогов, используя
df = spark.read.csv(['directory_1','directory_2','directory_3'.....], header=True)
. Для получения дополнительной информации см. Документацию
здесь