Выполните типизированное соединение в Scala с наборами данных Spark

Мне нравятся Spark Datasets, поскольку они дают мне ошибки анализа и синтаксические ошибки во время компиляции, а также позволяют мне работать с геттерами вместо жестко закодированных имен/чисел. Большинство вычислений можно выполнить с помощью высокоуровневых API-интерфейсов Datasets. Например, гораздо проще выполнять операции agg, select, sum, avg, map, filter или groupBy путем доступа к типизированным объектам набора данных, чем с использованием полей данных строк RDD.

Однако операция соединения отсутствует в этом, я читал, что могу сделать соединение, подобное этому

ds1.joinWith(ds2, ds1.toDF().col("key") === ds2.toDF().col("key"), "inner")

Но это не то, что я хочу, поскольку я бы предпочел сделать это через интерфейс класса case, поэтому что-то более похожее на это

ds1.joinWith(ds2, ds1.key === ds2.key, "inner")

Лучшая альтернатива для теперь, кажется, создает объект рядом с классом case и дает этим функциям предоставить мне правильное имя столбца как String. Поэтому я бы использовал первую строку кода, но вместо функции с жестко закодированным именем столбца помещал функцию. Но это не кажется достаточно элегантным.

Может ли кто-нибудь посоветовать мне другие варианты здесь? Цель состоит в том, чтобы иметь абстрагирование от фактических имен столбцов и работать предпочтительно с помощью геттеров класса case.

Я использую Spark 1.6.1 и Scala 2.10

Ответы

Ответ 1

наблюдение

Spark SQL может оптимизировать объединение только в том случае, если условие объединения основано на операторе равенства. Это означает, что мы можем рассматривать эквитионы и не equijoins отдельно.

эквисоединения

Equijoin может быть реализован безопасным образом типов путем сопоставления Datasets с Datasets (ключ, значение), выполнения соединения на основе ключей и изменения формы:

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset

def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
    (f: T => K, g: U => K)
    (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
  val ds1_ = ds1.map(x => (f(x), x))
  val ds2_ = ds2.map(x => (g(x), x))
  ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}

Non-эквисоединения

Может быть выражен с использованием операторов реляционной алгебры как R ⋈θ S = σθ (R × S) и преобразован непосредственно в код.

Spark 2.0

Включите crossJoin и используйте joinWith с тривиально равным предикатом:

spark.conf.set("spark.sql.crossJoin.enabled", true)

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
                         (p: (T, U) => Boolean) = {
  ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}

Искра 2.1

Используйте метод crossJoin:

def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
    (p: (T, U) => Boolean)
    (implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
  ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}

Примеры

case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)

val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
  LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS

safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)

Заметки

  • Следует отметить, что эти методы качественно отличаются от прямого приложения joinWith и требуют дорогостоящих преобразований DeserializeToObject/SerializeFromObject (по сравнению с этим прямым joinWith может использовать логические операции над данными).

    Это похоже на поведение, описанное в Spark 2.0 Dataset и DataFrame.

  • Если вы не ограничены Spark SQL API без frameless интересные типы безопасных расширений для Datasets (на сегодняшний день его поддерживает только Spark 2.0):

    import frameless.TypedDataset
    
    val typedPoints1 = TypedDataset.create(points1)
    val typedPoints2 = TypedDataset.create(points2)
    
    typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
    
  • API-интерфейс Dataset API нестабилен в 1.6, поэтому я не думаю, что имеет смысл использовать его там.

  • Конечно, эти конструктивные и описательные имена не нужны. Вы можете легко использовать класс типа, чтобы неявно добавлять эти методы в Dataset и нет конфликта со встроенными сигнатурами, так что оба они могут быть вызваны joinWith.

Ответ 2

Кроме того, еще одна серьезная проблема для безопасного типа Spark API не так, так это при объединении двух Datasets, это даст вам DataFrame. И тогда вы теряете типы из двух исходных двух наборов данных.

val a: Dataset[A]
val b: Dataset[B]

val joined: Dataframe = a.join(b)
// what would be great is 
val joined: Dataset[C] = a.join(b)(implicit func: (A, B) => C)