Spark: какая лучшая стратегия для объединения RDD с 2 ключами с однократным RDD?
У меня есть два RDD, к которым я хочу присоединиться, и они выглядят так:
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
Случается, что ключевые значения rdd1
уникальны, а также то, что значения tuple-key rdd2
уникальны. Я хотел бы присоединиться к двум наборам данных, чтобы получить следующий rdd:
val rdd_joined:RDD[((T,W), (U,V))]
Какой самый эффективный способ достичь этого? Вот несколько идей, о которых я думал.
Вариант 1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
Вариант 2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
Вариант 1 будет собирать все данные для мастеринга, правильно? Таким образом, это не похоже на хороший вариант, если rdd1 большой (он относительно большой в моем случае, хотя на порядок меньше rdd2). Вариант 2 делает уродливый отчетливый и декартова продукт, который также кажется очень неэффективным. Еще одна возможность, которая пришла мне в голову (но еще не пробовала), - это сделать вариант 1 и трансляцию карты, хотя лучше было бы транслировать "умным" способом, чтобы ключи карты были расположены вместе с ключи rdd2
.
Кто-нибудь раньше сталкивался с подобной ситуацией? Я был бы рад получить ваши мысли.
Спасибо!
Ответы
Ответ 1
Один из вариантов заключается в том, чтобы выполнить широковещательное соединение, собирая rdd1
в драйвер и передавая его всем картографам; сделано правильно, это позволит нам избежать дорогого перетасовки большого rdd2
RDD:
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))
val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
val m = rdd1Broadcast.value
for {
((t, w), u) <- iter
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)
preservesPartitioning = true
сообщает Спарку, что эта функция карты не изменяет ключи rdd2
; это позволит Spark избежать повторного разбиения на разделы rdd2
для любых последующих операций, которые соединяются на основе ключа (t, w)
.
Эта передача может быть неэффективной, поскольку она связана с узким местом связи в драйвере. В принципе, можно передавать один RDD другому без привлечения водителя; У меня есть прототип этого, который я бы хотел обобщить и добавить в Spark.
Другим вариантом является повторная карта ключей rdd2
и использование метода Spark join
; это будет включать полную перетасовку rdd2
(и, возможно, rdd1
):
rdd1.join(rdd2.map {
case ((t, w), u) => (t, (w, u))
}).map {
case (t, (v, (w, u))) => ((t, w), (u, v))
}.collect()
В моем примере ввода оба этих метода дают один и тот же результат:
res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
Третий вариант - реструктурировать rdd2
, чтобы t
был его ключом, а затем выполнил указанное выше соединение.
Ответ 2
Другой способ сделать это - создать пользовательский разделитель, а затем использовать zipPartitions для присоединения к вашим RDD.
import org.apache.spark.HashPartitioner
class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {
override def getPartition(key: Any): Int = key match {
case k: Tuple2[Int, String] => super.getPartition(k._1)
case _ => super.getPartition(key)
}
}
val numSplits = 8
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))
val result = rdd2.zipPartitions(rdd1)(
(iter2, iter1) => {
val m = iter1.toMap
for {
((t: Int, w), u) <- iter2
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}
).partitionBy(new HashPartitioner(numSplits))
result.glom.collect