Как избежать дублирования столбцов после объединения?
У меня есть два фрейма данных со следующими столбцами:
df1.columns
// Array(ts, id, X1, X2)
и
df2.columns
// Array(ts, id, Y1, Y2)
После
val df_combined = df1.join(df2, Seq(ts,id))
Я получаю следующие столбцы: Array(ts, id, X1, X2, ts, id, Y1, Y2)
. Я мог ожидать, что общие столбцы будут удалены. Есть ли что-то, что необходимо сделать?
Ответы
Ответ 1
Простой ответ (из Часто задаваемые вопросы по данным по этому вопросу) заключается в том, чтобы выполнить объединение, в котором объединенные столбцы выражаются как массив строк (или одна строка) вместо предиката.
Ниже приведен пример, связанный с часто задаваемыми параметрами Databricks, но с двумя столбцами соединения, чтобы ответить на исходный вопрос о посте.
Вот кадр слева:
val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))
val left = llist.toDF("firstname","lastname","date","duration")
left.show()
/*
+---------+--------+----------+--------+
|firstname|lastname| date|duration|
+---------+--------+----------+--------+
| bob| b|2015-01-13| 4|
| alice| a|2015-04-23| 10|
+---------+--------+----------+--------+
*/
Вот кадр справа:
val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")
right.show()
/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
| alice| a| 100|
| bob| b| 23|
+---------+--------+------+
*/
Ниже приведено неверное решение, где столбцы соединения определяются как предикат left("firstname")===right("firstname") && left("lastname")===right("lastname")
.
Неправильный результат состоит в том, что столбцы firstname
и lastname
дублируются в объединенном фрейме данных:
left.join(right, left("firstname")===right("firstname") &&
left("lastname")===right("lastname")).show
/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname| date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
| bob| b|2015-01-13| 4| bob| b| 23|
| alice| a|2015-04-23| 10| alice| a| 100|
+---------+--------+----------+--------+---------+--------+------+
*/
Правильное решение состоит в том, чтобы определить столбцы соединения как массив строк Seq("firstname", "lastname")
. Кадр выходных данных не имеет дублированных столбцов:
left.join(right, Seq("firstname", "lastname")).show
/*
+---------+--------+----------+--------+------+
|firstname|lastname| date|duration|upload|
+---------+--------+----------+--------+------+
| bob| b|2015-01-13| 4| 23|
| alice| a|2015-04-23| 10| 100|
+---------+--------+----------+--------+------+
*/
Ответ 2
Это ожидаемое поведение. Метод DataFrame.join
эквивалентен SQL-соединению, как это
SELECT * FROM a JOIN b ON joinExprs
Если вы хотите игнорировать повторяющиеся столбцы, просто отбросьте их или выберите интересующие столбцы впоследствии. Если вы хотите устранить неоднозначность, вы можете использовать доступ к ним, используя родительские DataFrames
:
val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???
a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))
или используйте псевдонимы:
// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")
Для равных объединений существует специальный синтаксис ярлыков, который принимает либо последовательность строк:
val usingColumns: Seq[String] = ???
a.join(b, usingColumns)
или как одна строка
val usingColumn: String = ???
a.join(b, usingColumn)
которые содержат только одну копию столбцов, используемых в условии соединения.
Ответ 3
Я застрял в этом некоторое время, и только недавно я придумал решение, что довольно легко.
Скажите, что a
scala> val a = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]
scala> a.show
+---+----+
|key|vala|
+---+----+
| a| 1|
| b| 2|
+---+----+
and
scala> val b = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]
scala> b.show
+---+----+
|key|valb|
+---+----+
| a| 1|
+---+----+
и я могу сделать это, чтобы выбрать только значение в dataframe a:
scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
| a| 1|
| b| 2|
+---+----+
Ответ 4
Вы можете просто использовать это
df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")
Здесь TYPE-OF-JOIN может быть
- оставил
- право
- внутренний
- fullouter
Например, у меня есть два кадра данных, как это:
// df1
word count1
w1 10
w2 15
w3 20
// df2
word count2
w1 100
w2 150
w5 200
Если вы присоединитесь к fullouter, то результат выглядит так
df1.join(df2, Seq("word"),"fullouter").show()
word count1 count2
w1 10 100
w2 15 150
w3 20 null
w5 null 200
Ответ 5
Это нормальное поведение SQL, что я делаю для этого:
- Отбрасывать или переименовывать исходные столбцы
- Сделайте соединение
- Переименовать столбцы, если есть
Здесь я заменяю столбец "fullname":
Некоторый код в Java:
this
.sqlContext
.read()
.parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day))
.drop("fullname")
.registerTempTable("data_original");
this
.sqlContext
.read()
.parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day))
.registerTempTable("data_v2");
this
.sqlContext
.sql(etlQuery)
.repartition(1)
.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
Если запрос:
SELECT
d.*,
concat_ws('_', product_name, product_module, name) AS fullname
FROM
{table_source} d
LEFT OUTER JOIN
{table_updates} u ON u.id = d.id
Это то, что вы можете сделать только с Spark, я верю (drop column из списка), очень полезно!
Ответ 6
Выполните "левое" соединение для NeededDatasetForColumns и NotNeededColumnsDataset. Он не будет дублировать столбцы из NotNeededColumnsDataset.
Смотрите здесь для ссылки: http://www.techburps.com/misc/apache-spark-dataset-joins-in-java/129
Ответ 7
попробуй это,
val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))
Ответ 8
Лучше всего перед тем, как присоединять их к именам столбцов в DF, и отбрасывать их соответствующим образом.
df1.columns = [id, возраст, доход]
df2.column = [id, age_group]
df1.join(df2, on = df1.id == df2.id, how = 'inner'). write.saveAsTable('table_name')
//вернет ошибку, а ошибка для повторяющихся столбцов
//вместо этого попробуйте это
df1.join(df2.withColumnRenamed('id', 'id_2'), on = df1.id == df2.id_2, how = 'inner'). drop ('id_2')
Ответ 9
После объединения нескольких таблиц я запускаю их с помощью простой функции, чтобы переименовать столбцы в DF, если он встречается с дубликатами. Кроме того, вы также можете удалить эти дубликаты столбцов.
Где Names
- это таблица со столбцами ['Id', 'Name', 'DateId', 'Description']
, а Dates
- это таблица со столбцами ['Id', 'Date', 'Description']
, после объединения столбцы Id
и Description
будут дублированы.
Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = deDupeDfCols(NamesAndDates, '_')
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")
Где deDupeDfCols
определяется как:
def deDupeDfCols(df, separator=''):
newcols = []
for col in df.columns:
if col not in newcols:
newcols.append(col)
else:
for i in range(2, 1000):
if (col + separator + str(i)) not in newcols:
newcols.append(col + separator + str(i))
break
return df.toDF(*newcols)
Полученный фрейм данных будет содержать столбцы ['Id', 'Name', 'DateId', 'Description', 'Id2', 'Date', 'Description2']
.
Извиняюсь за этот ответ на Python - я не знаком со Scala, но этот вопрос возник, когда я погуглил эту проблему, и я уверен, что код Scala не слишком отличается.