Как выполнить объединение на двух DataFrames с различным количеством столбцов в искровом?
У меня есть 2 DataFrame
следующим образом:
![Исходные данные]()
Мне нужен союз следующим образом:
![введите описание изображения здесь]()
Функция unionAll
не работает, потому что число и имя столбцов различны.
Как я могу это сделать?
Ответы
Ответ 1
В Scala вам просто нужно добавить все отсутствующие столбцы как nulls
.
import org.apache.spark.sql.functions._
// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
(50, 2),
(34, 4)
)).toDF("age", "children")
val df2 = sc.parallelize(List(
(26, true, 60000.00),
(32, false, 35000.00)
)).toDF("age", "education", "income")
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union
def expr(myCols: Set[String], allCols: Set[String]) = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()
+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50| 2| null| null|
| 34| 4| null| null|
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+
Update
Оба временных DataFrames
будут иметь одинаковый порядок столбцов, потому что в обоих случаях мы отображаем через total
.
df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()
+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50| 2| null| null|
| 34| 4| null| null|
+---+--------+---------+------+
+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26| null| true|60000.0|
| 32| null| false|35000.0|
+---+--------+---------+-------+
Ответ 2
Очень простой способ сделать это - select
столбцы в том же порядке из обоих кадров данных и использовать unionAll
df1.select('code', 'date', 'A', 'B', 'C', lit(None).alias('D'), lit(None).alias('E'))\
.unionAll(df2.select('code', 'date', lit(None).alias('A'), 'B', 'C', 'D', 'E'))
Ответ 3
Здесь решение pyspark.
Предполагается, что если поле в df1
отсутствует в df2
, то вы добавляете это отсутствующее поле в df2
с нулевыми значениями. Однако также предполагается, что если поле существует в обоих кадрах данных, но тип или обнуляемость поля различны, то эти два кадра данных конфликтуют и не могут быть объединены. В этом случае я поднимаю TypeError
.
from pyspark.sql.functions import lit
def harmonize_schemas_and_combine(df_left, df_right):
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
# First go over left-unique fields
for l_name, l_type, l_nullable in left_fields.difference(right_fields):
if l_name in right_types:
r_type = right_types[l_name]
if l_type != r_type:
raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s" % (l_name, l_nullable, not(l_nullable))
df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
# Now go over right-unique fields
for r_name, r_type, r_nullable in right_fields.difference(left_fields):
if r_name in left_types:
l_type = left_types[r_name]
if r_type != l_type:
raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
df_left = df_left.withColumn(r_name, lit(None).cast(r_type))
# Make sure columns are in the same order
df_left = df_left.select(df_right.columns)
return df_left.union(df_right)
Ответ 4
Измененная версия Alberto Bonsanto для сохранения исходного порядка столбцов (OP подразумевает, что порядок должен соответствовать исходным таблицам). Кроме того, часть match
вызвала предупреждение Intellij.
Здесь моя версия:
def unionDifferentTables(df1: DataFrame, df2: DataFrame): DataFrame = {
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union
val order = df1.columns ++ df2.columns
val sorted = total.toList.sortWith((a,b)=> order.indexOf(a) < order.indexOf(b))
def expr(myCols: Set[String], allCols: List[String]) = {
allCols.map( {
case x if myCols.contains(x) => col(x)
case y => lit(null).as(y)
})
}
df1.select(expr(cols1, sorted): _*).unionAll(df2.select(expr(cols2, sorted): _*))
}
Ответ 5
Вот код для Python 3.0 с помощью pyspark:
from pyspark.sql import SQLContext
import pyspark
from pyspark.sql.functions import lit
def __orderDFAndAddMissingCols(df, columnsOrderList, dfMissingFields):
''' return ordered dataFrame by the columns order list with null in missing columns '''
if not dfMissingFields: #no missing fields for the df
return df.select(columnsOrderList)
else:
columns = []
for colName in columnsOrderList:
if colName not in dfMissingFields:
columns.append(colName)
else:
columns.append(lit(None).alias(colName))
return df.select(columns)
def __addMissingColumns(df, missingColumnNames):
''' Add missing columns as null in the end of the columns list '''
listMissingColumns = []
for col in missingColumnNames:
listMissingColumns.append(lit(None).alias(col))
return df.select(df.schema.names + listMissingColumns)
def __orderAndUnionDFs( leftDF, rightDF, leftListMissCols, rightListMissCols):
''' return union of data frames with ordered columns by leftDF. '''
leftDfAllCols = __addMissingColumns(leftDF, leftListMissCols)
rightDfAllCols = __orderDFAndAddMissingCols(rightDF, leftDfAllCols.schema.names, rightListMissCols)
return leftDfAllCols.union(rightDfAllCols)
def unionDFs(leftDF,rightDF):
''' Union between two dataFrames, if there is a gap of column fields,
it will append all missing columns as nulls '''
# Check for None input
if leftDF == None:
raise ValueError('leftDF parameter should not be None')
if rightDF == None:
raise ValueError('rightDF parameter should not be None')
#For data frames with equal columns and order- regular union
if leftDF.schema.names == rightDF.schema.names:
return leftDF.union(rightDF)
else: # Different columns
#Save dataFrame columns name list as set
leftDFColList = set(leftDF.schema.names)
rightDFColList = set(rightDF.schema.names)
# Diff columns between leftDF and rightDF
rightListMissCols = list(leftDFColList - rightDFColList)
leftListMissCols = list(rightDFColList - leftDFColList)
return __orderAndUnionDFs(leftDF, rightDF, leftListMissCols, rightListMissCols)
if __name__ == '__main__':
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)
leftDF = sqlContext.createDataFrame( [(1, 2, 11), (3, 4, 12)] , ('a','b','d'))
rightDF = sqlContext.createDataFrame( [(5, 6 , 9), (7, 8, 10)] , ('b','a','c'))
unionDF = unionDFs(leftDF,rightDF)
print(unionDF.select(unionDF.schema.names).show())
Ответ 6
У меня была такая же проблема, и использование соединения вместо объединения решило мою проблему.
Так, например, с python вместо этой строки кода:
result = left.union(right)
, который не будет выполнен для разного количества столбцов,
вы должны использовать этот:
result = left.join(right, left.columns if (len(left.columns) < len(right.columns)) else right.columns, "outer")
Обратите внимание, что второй аргумент содержит общие столбцы между двумя DataFrames. Если вы его не используете, результат будет иметь повторяющиеся столбцы, один из которых будет нулевым, а другой - нет.
Надеюсь, что это поможет.
Ответ 7
Существует много краткого пути решения этой проблемы с умеренной жертвой производительности.
def unionWithDifferentSchema(a: DataFrame, b: DataFrame): DataFrame = {
sparkSession.read.json(a.toJSON.union(b.toJSON).rdd)
}
Это функция, которая выполняет трюк. Использование toJSON для каждого блока данных делает json Union. Это сохраняет порядок и тип данных.
Только улов является toJSON относительно дорогим (однако не так много вы, вероятно, получите 10-15% -ное замедление). Однако это сохраняет код чистым.
Ответ 8
Вот моя версия Python:
from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row
def customUnion(df1, df2):
cols1 = df1.columns
cols2 = df2.columns
total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
def expr(mycols, allcols):
def processCols(colname):
if colname in mycols:
return colname
else:
return lit(None).alias(colname)
cols = map(processCols, allcols)
return list(cols)
appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
return appended
Вот пример использования:
data = [
Row(zip_code=58542, dma='MIN'),
Row(zip_code=58701, dma='MIN'),
Row(zip_code=57632, dma='MIN'),
Row(zip_code=58734, dma='MIN')
]
firstDF = spark.createDataFrame(data)
data = [
Row(zip_code='534', name='MIN'),
Row(zip_code='353', name='MIN'),
Row(zip_code='134', name='MIN'),
Row(zip_code='245', name='MIN')
]
secondDF = spark.createDataFrame(data)
customUnion(firstDF,secondDF).show()
Ответ 9
вот еще один:
def unite(df1: DataFrame, df2: DataFrame): DataFrame = {
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = (cols1 ++ cols2).toSeq.sorted
val expr1 = total.map(c => {
if (cols1.contains(c)) c else "NULL as " + c
})
val expr2 = total.map(c => {
if (cols2.contains(c)) c else "NULL as " + c
})
df1.selectExpr(expr1:_*).union(
df2.selectExpr(expr2:_*)
)
}
Ответ 10
Объединение и внешнее объединение для объединения Pyspark DataFrame. Это работает для нескольких фреймов данных с разными столбцами.
def union_all(*dfs):
return reduce(ps.sql.DataFrame.unionAll, dfs)
def outer_union_all(*dfs):
all_cols = set([])
for df in dfs:
all_cols |= set(df.columns)
all_cols = list(all_cols)
print(all_cols)
def expr(cols, all_cols):
def append_cols(col):
if col in cols:
return col
else:
return sqlfunc.lit(None).alias(col)
cols_ = map(append_cols, all_cols)
return list(cols_)
union_df = union_all(*[df.select(expr(df.columns, all_cols)) for df in dfs])
return union_df