Ответ 1
Поскольку ваш вопрос довольно смутно, подумайте об общих стратегиях, которые могут быть использованы для решения этой проблемы.
Стандартное решение здесь будет кэшировать, но поскольку вы явно хотите его избежать, я предполагаю здесь некоторые дополнительные ограничения. Это предполагает, что некоторые подобные решения, такие как
также неприемлемы. Это означает, что вам нужно найти некоторые из них для управления самим трубопроводом.
Хотя множественные преобразования могут быть сжаты вместе, каждое преобразование создает новый RDD. Это, в сочетании с вашим выражением о кэшировании, устанавливает относительно сильные ограничения на возможные решения.
Начнем с самого простого случая, когда все конвейеры могут быть выражены одноступенчатыми заданиями. Это ограничивает наш выбор отображением только заданий и простых сокращений по карте (например, описанных в вашем вопросе). Такие трубопроводы можно легко выразить как последовательность операций на локальных итераторах. Итак, следующее
import org.apache.spark.util.StatCounter
def isEven(x: Long) = x % 2 == 0
def isOdd(x: Long) = !isEven(x)
def p1(rdd: RDD[Long]) = {
rdd
.filter(isEven _)
.aggregate(StatCounter())(_ merge _, _ merge _)
.mean
}
def p2(rdd: RDD[Long]) = {
rdd
.filter(isOdd _)
.reduce(_ + _)
}
может быть выражена как:
def p1(rdd: RDD[Long]) = {
rdd
.mapPartitions(iter =>
Iterator(iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)))
.collect
.reduce(_ merge _)
.mean
}
def p2(rdd: RDD[Long]) = {
rdd
.mapPartitions(iter =>
Iterator(iter.filter(isOdd _).foldLeft(0L)(_ + _)))
.collect
.reduce(_ + _)
// identity _
}
В этот момент мы можем переписать отдельные задания следующим образом:
def mapPartitions2[T, U, V](rdd: RDD[T])(f: Iterator[T] => U, g: Iterator[T] => V) = {
rdd.mapPartitions(iter => {
val items = iter.toList
Iterator((f(items.iterator), g(items.iterator)))
})
}
def reduceLocally2[U, V](rdd: RDD[(U, V)])(f: (U, U) => U, g: (V, V) => V) = {
rdd.collect.reduce((x, y) => (f(x._1, y._1), g(x._2, y._2)))
}
def evaluate[U, V, X, Z](pair: (U, V))(f: U => X, g: V => Z) = (f(pair._1), g(pair._2))
val rdd = sc.range(0L, 100L)
def f(iter: Iterator[Long]) = iter.filter(isEven _).foldLeft(StatCounter())(_ merge _)
def g(iter: Iterator[Long]) = iter.filter(isOdd _).foldLeft(0L)(_ + _)
evaluate(reduceLocally2(mapPartitions2(rdd)(f, g))(_ merge _, _ + _))(_.mean, identity)
Самая большая проблема здесь заключается в том, что мы должны с готовностью оценивать каждый раздел, чтобы иметь возможность применять отдельные конвейеры. Это означает, что общие требования к памяти могут быть значительно выше по сравнению с той же самой логикой, применяемой отдельно. Без кеширования * это также бесполезно в случае многоступенчатых заданий.
Альтернативным решением является обработка данных по элементам, но обработка каждого элемента в виде кортежей seqs:
def map2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(f: T => V, g: U => X) = {
rdd.map{ case (ts, us) => (ts.map(f), us.map(g)) }
}
def filter2[T, U](rdd: RDD[(Seq[T], Seq[U])])(
f: T => Boolean, g: U => Boolean) = {
rdd.map{ case (ts, us) => (ts.filter(f), us.filter(g)) }
}
def aggregate2[T, U, V, X](rdd: RDD[(Seq[T], Seq[U])])(zt: V, zu: X)
(s1: (V, T) => V, s2: (X, U) => X, m1: (V, V) => V, m2: (X, X) => X) = {
rdd.mapPartitions(iter => {
var accT = zt
var accU = zu
iter.foreach { case (ts, us) => {
accT = ts.foldLeft(accT)(s1)
accU = us.foldLeft(accU)(s2)
}}
Iterator((accT, accU))
}).reduce { case ((v1, x1), (v2, x2)) => ((m1(v1, v2), m2(x1, x2))) }
}
С API, подобным этому, мы можем выразить исходные конвейеры как:
val rddSeq = rdd.map(x => (Seq(x), Seq(x)))
aggregate2(filter2(rddSeq)(isEven, isOdd))(StatCounter(), 0L)(
_ merge _, _ + _, _ merge _, _ + _
)
Этот подход немного более мощный, чем первый (вы можете легко реализовать некоторое подмножество методов byKey
, если это необходимо), а требования к памяти в типичных конвейерах должны быть сопоставимы с основным API, но это также значительно более навязчиво.
* Вы можете проверить ответ, предоставленный eje для примеров мультиплексирования.