Удаление повторяющихся столбцов после объединения DF в Spark
Когда вы присоединяетесь к двум DF с похожими именами столбцов:
df = df1.join(df2, df1['id'] == df2['id'])
Соединение работает нормально, но вы не можете вызвать столбец id
, потому что он неоднозначен, и вы получите следующее исключение:
pyspark.sql.utils.AnalysisException: "Reference 'id' is ambiguous, could be: id#5691, id#5918.;"
Это делает id
непригодным для использования...
Следующая функция решает проблему:
def join(df1, df2, cond, how='left'):
df = df1.join(df2, cond, how=how)
repeated_columns = [c for c in df1.columns if c in df2.columns]
for col in repeated_columns:
df = df.drop(df2[col])
return df
Что мне не нравится в этом, так это то, что я должен перебирать имена столбцов и удалять их почему-то одним. Это выглядит действительно неуклюжим...
Знаете ли вы о каком-либо другом решении, которое будет более элегантно или удалять дубликаты, или удалять несколько столбцов без повторения каждого из них?
Ответы
Ответ 1
Если столбцы соединения в обоих кадрах данных имеют одинаковые имена, и вам нужно только equi join, вы можете указать столбцы объединения как список, и в этом случае результат будет содержать только один из столбцов соединения:
df1.show()
+---+----+
| id|val1|
+---+----+
| 1| 2|
| 2| 3|
| 4| 4|
| 5| 5|
+---+----+
df2.show()
+---+----+
| id|val2|
+---+----+
| 1| 2|
| 1| 3|
| 2| 4|
| 3| 5|
+---+----+
df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
| 1| 2| 2|
| 1| 2| 3|
| 2| 3| 4|
+---+----+----+
В противном случае вам нужно предоставить псевдоним фреймов соединений и снова обратиться к дублированным столбцам через псевдоним:
df1.alias("a").join(
df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
| 1| 2| 2|
| 1| 2| 3|
| 2| 3| 4|
+---+----+----+
Ответ 2
df.join(other, on, how)
, когда on
является строкой имени столбца или списком строк имен столбцов, возвращенный фрейм данных будет предотвращать дублирование столбцов.
когда on
является выражением соединения, оно приведет к дублированию столбцов. Мы можем использовать .drop(df.a)
для удаления дубликатов столбцов. Пример:
cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)
Ответ 3
Предполагая, что "a" является фреймом данных со столбцом "id", а "b" является другим фреймом данных со столбцом "id"
Я использую следующие два метода для удаления дубликатов:
Способ 1. Использование выражения String Join Expression вместо логического выражения. Это автоматически удалит дубликат столбца для вас
a.join(b, 'id')
Способ 2: переименование столбца до объединения и удаление его после
b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)
Ответ 4
Код ниже работает с Spark 1.6.0 и выше.
salespeople_df.show()
+---+------+-----+
|Num| Name|Store|
+---+------+-----+
| 1| Henry| 100|
| 2| Karen| 100|
| 3| Paul| 101|
| 4| Jimmy| 102|
| 5|Janice| 103|
+---+------+-----+
storeaddress_df.show()
+-----+--------------------+
|Store| Address|
+-----+--------------------+
| 100| 64 E Illinos Ave|
| 101| 74 Grand Pl|
| 102| 2298 Hwy 7|
| 103|No address available|
+-----+--------------------+
Предполагая в этом примере, что имя общего столбца одинаков:
joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()
+-----+---+------+--------------------+
|Store|Num| Name| Address|
+-----+---+------+--------------------+
| 100| 1| Henry| 64 E Illinos Ave|
| 100| 2| Karen| 64 E Illinos Ave|
| 101| 3| Paul| 74 Grand Pl|
| 102| 4| Jimmy| 2298 Hwy 7|
| 103| 5|Janice|No address available|
+-----+---+------+--------------------+
.join
предотвратит дублирование общего столбца.
Предположим, что вы хотите удалить столбец Num
в этом примере, вы можете просто использовать .drop('colname')
joined=joined.drop('Num')
joined.show()
+-----+------+--------------------+
|Store| Name| Address|
+-----+------+--------------------+
| 103|Janice|No address available|
| 100| Henry| 64 E Illinos Ave|
| 100| Karen| 64 E Illinos Ave|
| 101| Paul| 74 Grand Pl|
| 102| Jimmy| 2298 Hwy 7|
+-----+------+--------------------+
Ответ 5
После того, как я объединил несколько таблиц вместе, я пропустил их через простую функцию, чтобы отбрасывать столбцы в DF, если он встречается с дубликатами при ходьбе слева направо. Кроме того, вы также можете переименовать эти столбцы.
Где Names
- это таблица со столбцами ['Id', 'Name', 'DateId', 'Description']
, а Dates
- это таблица со столбцами ['Id', 'Date', 'Description']
, после объединения столбцы Id
и Description
будут дублированы.
Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = dropDupeDfCols(NamesAndDates)
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")
Где dropDupeDfCols
определяется как:
def dropDupeDfCols(df):
newcols = []
dupcols = []
for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)
df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))
return df.toDF(*newcols)
Полученный фрейм данных будет содержать столбцы ['Id', 'Name', 'DateId', 'Description', 'Date']
.