Spark задает несколько условий столбца для объединения данных
Как предоставить больше условий столбца при объединении двух фреймов данных. Например, я хочу запустить следующее:
val Lead_all = Leads.join(Utm_Master,
Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")
Я хочу присоединиться только тогда, когда совпадают эти столбцы. Но выше синтаксис недействителен, так как cols принимает только одну строку. Итак, как мне получить то, что я хочу.
Ответы
Ответ 1
В этом случае есть ассоциация Spark API столбца/выражения:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
Оператор <=>
в этом примере означает тест на равномерность для нулевых значений.
Основное отличие от простого теста Equality (===
) заключается в том, что первый безопасен для использования в случае, если один из столбцов может имеют нулевые значения.
Ответ 2
В версии Spark версии 1.5.0 (которая в настоящее время не издана) вы можете присоединиться к нескольким столбцам DataFrame. Обратитесь к SPARK-7990: добавьте методы, чтобы облегчить равноудаление нескольких ключей присоединения.
Python
Leads.join(
Utm_Master,
["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
"left_outer"
)
Scala
Вопрос задан для ответа Scala, но я не использую Scala. Вот мое лучшее предположение....
Leads.join(
Utm_Master,
Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left_outer"
)
Ответ 3
Одна вещь, которую вы можете сделать, это использовать raw SQL:
case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)
val bar = sqlContext.createDataFrame(sc.parallelize(
Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
Bar(3, 1, 2, "bar") :: Nil))
val foo = sqlContext.createDataFrame(sc.parallelize(
Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))
foo.registerTempTable("foo")
bar.registerTempTable("bar")
sqlContext.sql(
"SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")
Ответ 4
В Pyspark вы можете просто указать каждое условие отдельно:
val Lead_all = Leads.join(Utm_Master,
(Leaddetails.LeadSource == Utm_Master.LeadSource) &
(Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
(Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
(Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))
Не забудьте правильно использовать операторы и скобки.
Ответ 5
Scala:
Leaddetails.join(
Utm_Master,
Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
&& Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
&& Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
&& Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
"left"
)
Чтобы сделать регистр нечувствительным,
import org.apache.spark.sql.functions.{lower, upper}
то просто используйте lower(value)
в условии метода соединения.
Например: dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))
Ответ 6
Опции ===
дают мне дублированные столбцы. Поэтому я использую Seq
вместо этого.
val Lead_all = Leads.join(Utm_Master,
Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")
Конечно, это работает только тогда, когда имена соединяющихся столбцов совпадают.
Ответ 7
Spark SQL поддерживает соединение в кортеже столбцов, когда в круглых скобках, например
... WHERE (list_of_columns1) = (list_of_columns2)
что является способом, меньшим, чем указание равных выражений (=) для каждой пары столбцов, объединенных набором "AND" s.
Например:
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
)
вместо
SELECT a,b,c
FROM tab1 t1
WHERE
NOT EXISTS
( SELECT 1
FROM t1_except_t2_df e
WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
)
что менее читаемо, особенно если список столбцов большой, и вы хотите легко обращаться с NULL.
Ответ 8
Попробуйте это:
val rccJoin=dfRccDeuda.as("dfdeuda")
.join(dfRccCliente.as("dfcliente")
,col("dfdeuda.etarcid")===col("dfcliente.etarcid")
&& col("dfdeuda.etarcid")===col("dfcliente.etarcid"),"inner")