API-интерфейс Spark Dataset - присоединиться

Я пытаюсь использовать API-интерфейс Spark Dataset, но у меня возникают некоторые проблемы, связанные с простым объединением.

Скажем, у меня есть два набора данных с полями: date | value date | value, то в случае DataFrame мое соединение будет выглядеть так:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

Однако для Dataset существует метод .joinWith, но такой же подход не работает:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

Каков аргумент, требуемый .joinWith?

Ответы

Ответ 1

Чтобы использовать joinWith вам сначала нужно создать DataSet, и, скорее всего, два из них. Чтобы создать DataSet, вам нужно создать класс case, соответствующий вашей схеме, и вызвать DataFrame.as[T] где T - ваш класс case. Так:

case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

Вы также можете пропустить класс case и использовать кортеж:

val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

Тогда, если у вас есть другой класс case/DF, вот так:

case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

Тогда, когда синтаксис join и joinWith схожи, результаты разные:

df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// |  1| asdf|  1| 7.7| 101|
// |  2|34234|  2| 1.2|  10|
// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// |       _1|         _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+

Как вы можете видеть, joinWith оставляет объекты неповрежденными как части кортежа, а join выравнивает столбцы в одно пространство имен. (Это вызовет проблемы в приведенном выше случае, потому что имя столбца "ключ" повторяется.)

Любопытно, что я должен использовать df.col("key") и df2.col("key") для создания условий для соединения ds и ds2 - если вы используете только col("key") с каждой стороны, он делает не работает, а ds.col(...) не существует. Однако использование оригинала df.col("key") делает трюк.

Ответ 3

В приведенном выше примере вы можете попробовать ниже вариант -

  • Определите класс case для вашего вывода

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • Присоедините два набора данных с помощью "Seq (" ключ ")", это поможет вам избежать двух повторяющихся столбцов ключа на выходе. Это поможет применить класс case или получить данные на следующем шаге

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+