Как определить пользовательский разделитель для Spark RDD с одинаковым размером раздела, где каждый раздел имеет одинаковое количество элементов?
Я новичок в Spark. У меня есть большой набор данных из элементов [RDD], и я хочу разделить его на два точно равных размера разделов, поддерживающих порядок элементов. Я пробовал использовать RangePartitioner
как
var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
Это не дает удовлетворительного результата, потому что он делит грубый, но не точно равный размер, поддерживающий порядок элементов.
Например, если имеется 64 элемента, мы используем
RangePartitioner
, то он делит на 31 элемент и 33 элемента.
Мне нужен разделитель, так что я получаю ровно первые 32 элемента в одной половине, а другая половина содержит второй набор из 32 элементов.
Не могли бы вы помочь мне, предложив использовать настраиваемый разделитель таким образом, чтобы я получал одинаковые по размеру две половины, поддерживая порядок элементов?
Ответы
Ответ 1
Partitioner
работать, назначив ключ для раздела. Вам понадобится предварительное знание распределения ключей или просмотр всех ключей, чтобы сделать такой разделитель. Вот почему Spark не предоставляет вам один.
В общем, вам не нужен такой разделитель. На самом деле я не могу придумать вариант использования, где мне нужны равные по размеру разделы. Что делать, если количество элементов нечетно?
В любом случае, скажем, у вас есть RDD с ключом Int
s, и вы знаете, сколько всего. Затем вы можете написать пользовательский Partitioner
, как это:
class ExactPartitioner[V](
partitions: Int,
elements: Int)
extends Partitioner {
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[Int]
// `k` is assumed to go continuously from 0 to elements-1.
return k * partitions / elements
}
}
Ответ 2
Этот ответ вдохновлен Даниэлем, но обеспечивает полную реализацию (используя модный шаблон моей жены) с примером для копирования и вставки людей:)
import RDDConversions._
trait RDDWrapper[T] {
def rdd: RDD[T]
}
// TODO View bounds are deprecated, should use context bounds
// Might need to change ClassManifest for ClassTag in spark 1.0.0
case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
// Here we use a single Long to try to ensure the sort is balanced,
// but for really large dataset, we may want to consider
// using a tuple of many Longs or even a GUID
def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
.grouped(numPartitions).map(t => (t._1._1, t._2))
}
case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
def grouped(size: Int): RDD[T] = {
// TODO Version where withIndex is cached
val withIndex = rdd.mapPartitions(_.zipWithIndex)
val startValues =
withIndex.mapPartitionsWithIndex((i, iter) =>
Iterator((i, iter.toIterable.last))).toArray().toList
.sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)
withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
case (value, index) => (startValues(i) + index.toLong, value)
})
.partitionBy(new Partitioner {
def numPartitions: Int = size
def getPartition(key: Any): Int =
(key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
})
.map(_._2)
}
}
Тогда в другом файле
// TODO modify above to be implicit class, rather than have implicit conversions
object RDDConversions {
implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] =
new RichRDD[T](rdd)
implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)
implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
}
Затем для вашего случая использования вы просто хотите (если он уже отсортирован)
import RDDConversions._
yourRdd.grouped(2)
Отказ от ответственности: не проверен, просто написал это прямо в ответ SO