Ответ 1
наблюдение
Spark SQL может оптимизировать объединение только в том случае, если условие объединения основано на операторе равенства. Это означает, что мы можем рассматривать эквитионы и не equijoins отдельно.
эквисоединения
Equijoin может быть реализован безопасным образом типов путем сопоставления Datasets
с Datasets
(ключ, значение), выполнения соединения на основе ключей и изменения формы:
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Dataset
def safeEquiJoin[T, U, K](ds1: Dataset[T], ds2: Dataset[U])
(f: T => K, g: U => K)
(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]) = {
val ds1_ = ds1.map(x => (f(x), x))
val ds2_ = ds2.map(x => (g(x), x))
ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
}
Non-эквисоединения
Может быть выражен с использованием операторов реляционной алгебры как R ⋈θ S = σθ (R × S) и преобразован непосредственно в код.
Spark 2.0
Включите crossJoin
и используйте joinWith
с тривиально равным предикатом:
spark.conf.set("spark.sql.crossJoin.enabled", true)
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean) = {
ds1.joinWith(ds2, lit(true)).filter(p.tupled)
}
Искра 2.1
Используйте метод crossJoin
:
def safeNonEquiJoin[T, U](ds1: Dataset[T], ds2: Dataset[U])
(p: (T, U) => Boolean)
(implicit e1: Encoder[Tuple1[T]], e2: Encoder[Tuple1[U]], e3: Encoder[(T, U)]) = {
ds1.map(Tuple1(_)).crossJoin(ds2.map(Tuple1(_))).as[(T, U)].filter(p.tupled)
}
Примеры
case class LabeledPoint(label: String, x: Double, y: Double)
case class Category(id: Long, name: String)
val points1 = Seq(LabeledPoint("foo", 1.0, 2.0)).toDS
val points2 = Seq(
LabeledPoint("bar", 3.0, 5.6), LabeledPoint("foo", -1.0, 3.0)
).toDS
val categories = Seq(Category(1, "foo"), Category(2, "bar")).toDS
safeEquiJoin(points1, categories)(_.label, _.name)
safeNonEquiJoin(points1, points2)(_.x > _.x)
Заметки
-
Следует отметить, что эти методы качественно отличаются от прямого приложения
joinWith
и требуют дорогостоящих преобразованийDeserializeToObject
/SerializeFromObject
(по сравнению с этим прямымjoinWith
может использовать логические операции над данными).Это похоже на поведение, описанное в Spark 2.0 Dataset и DataFrame.
-
Если вы не ограничены Spark SQL API без
frameless
интересные типы безопасных расширений дляDatasets
(на сегодняшний день его поддерживает только Spark 2.0):import frameless.TypedDataset val typedPoints1 = TypedDataset.create(points1) val typedPoints2 = TypedDataset.create(points2) typedPoints1.join(typedPoints2, typedPoints1('x), typedPoints2('x))
-
API-интерфейс
Dataset
API нестабилен в 1.6, поэтому я не думаю, что имеет смысл использовать его там. -
Конечно, эти конструктивные и описательные имена не нужны. Вы можете легко использовать класс типа, чтобы неявно добавлять эти методы в
Dataset
и нет конфликта со встроенными сигнатурами, так что оба они могут быть вызваныjoinWith
.