Ответ 1
Я считаю, что это работает примерно так:
df
- это кадр данных с годами, месяцами и другими столбцами.
df.write.partitionBy('year', 'month').saveAsTable(...)
или
df.write.partitionBy('year', 'month').insertInto(...)
У меня есть пример приложения, работающего для чтения из файлов csv в dataframe. Информационный кадр можно хранить в таблице "Улов" в паркетном формате, используя метод
df.saveAsTable(tablename,mode)
.
Вышеприведенный код работает отлично, но у меня так много данных за каждый день, что я хочу динамически разбивать таблицу улья на основе createdate (столбец в таблице).
существует ли какой-либо способ динамического разбиения файловой рамки и хранения ее на склад хранилища. Хотите воздержаться от жесткого кодирования инструкции insert с помощью hivesqlcontext.sql(insert into table partittioin by(date)....)
.
Вопрос можно рассматривать как расширение: Как сохранить DataFrame непосредственно в Hive?
любая помощь очень ценится.
Я считаю, что это работает примерно так:
df
- это кадр данных с годами, месяцами и другими столбцами.
df.write.partitionBy('year', 'month').saveAsTable(...)
или
df.write.partitionBy('year', 'month').insertInto(...)
Мне удалось записать в секционированную таблицу hive с помощью df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")
Мне нужно было включить следующие свойства, чтобы заставить его работать.
hiveContext.setConf("hive.exec.dynamic.partition", "true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
Я тоже сталкивался с тем же, но использовал следующие трюки, которые я разрешил.
Когда мы делаем любую таблицу как секционированную, то секционированный столбец становится чувствительным к регистру.
Сегментированный столбец должен присутствовать в DataFrame с тем же именем (с учетом регистра). Код:
var dbName="your database name"
var finaltable="your table name"
// First check if table is available or not..
if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
//If table is not available then it will create for you..
println("Table Not Present \n Creating table " + finaltable)
sparkSession.sql("use Database_Name")
sparkSession.sql("SET hive.exec.dynamic.partition = true")
sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID string,EMP_Name string,EMP_Address string,EMP_Salary bigint) PARTITIONED BY (EMP_DEP STRING)")
//Table is created now insert the DataFrame in append Mode
df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
}