Как разделить RDD на два или более RDD?
Я ищу способ разделить RDD на два или более RDD. Самое близкое, что я видел, это Scala Искра: Разделить коллекцию на несколько RDD?, которая по-прежнему является единственным RDD.
Если вы знакомы с SAS, что-то вроде этого:
data work.split1, work.split2;
set work.preSplit;
if (condition1)
output work.split1
else if (condition2)
output work.split2
run;
что привело к двум различным наборам данных. Он должен быть немедленно сохранен, чтобы получить результаты, которые я намерен...
Ответы
Ответ 1
Невозможно получить несколько RDD из одного преобразования *. Если вы хотите разделить RDD, вы должны применить filter
для каждого условия разделения. Например:
def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
Если у вас есть только двоичное условие, и вычисления дороги, вы можете предпочесть что-то вроде этого:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()
rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
Это означает только одно вычисление предикатов, но требует дополнительного прохождения по всем данным.
Важно отметить, что до тех пор, пока входной RDD должным образом кэшируется, и нет никаких дополнительных предположений относительно распределения данных, нет существенной разницы, когда дело доходит до временной сложности между повторным фильтром и циклом с вложенным if-else.
С N элементами и условиями M число операций, которые вы должны выполнить, явно пропорционально N раз M. В случае петли для петли он должен быть ближе к (N + MN)/2, а повторный фильтр - это точно NM, но при В конце дня это не что иное, как O (NM). Вы можете увидеть мое обсуждение ** с Джейсоном Лендерманом, чтобы прочитать о некоторых плюсах и минусах.
На очень высоком уровне вы должны рассмотреть две вещи:
-
Преобразования искры ленивы, пока вы не выполните действие, которое ваше RDD не реализовано
Почему это имеет значение? Вернемся к моему примеру:
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
Если позже я решит, что мне нужно только rdd_odd
, тогда нет причин для материализации rdd_even
.
Если вы посмотрите на пример SAS для вычисления work.split2
, вам необходимо материализовать как входные данные, так и work.split1
.
-
RDD предоставляют декларативный API. Когда вы используете filter
или map
, это полностью зависит от двигателя Spark, как эта операция выполняется. Пока функции, переданные преобразованиям, свободны от побочных эффектов, он создает множество возможностей для оптимизации всего конвейера.
В конце дня этот случай не является особым, чтобы оправдать собственное преобразование.
Эта карта с шаблоном фильтра фактически используется в ядре Spark. См. Мой ответ на Как Sparks RDD.randomSplit фактически разделяет RDD и релевантная часть метода randomSplit
.
Если единственная цель состоит в достижении разделения на входе, то можно использовать предложение partitionBy
для DataFrameWriter
, формат текста:
def makePairs(row: T): (String, String) = ???
data
.map(makePairs).toDF("key", "value")
.write.partitionBy($"key").format("text").save(...)
* В Spark есть только 3 основных типа преобразований:
- RDD [T] = > RDD [T]
- RDD [T] = > RDD [U]
- (RDD [T], RDD [U]) = > RDD [W]
где T, U, W могут быть либо атомными типами, либо products/tuples (K, V). Любая другая операция должна быть выражена с использованием некоторой комбинации вышеизложенного. Вы можете проверить исходную документацию RDD для получения более подробной информации.
** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman
*** См. также Scala Искра: разделите коллекцию на несколько RDD?
Ответ 2
В качестве других плакатов, упомянутых выше, нет единого встроенного RDD-преобразования, которое разделяет RDD, но здесь есть некоторые "мультиплексные" операции, которые могут эффективно эмулировать большое разнообразие "расщепления" на RDD без чтения нескольких раз:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.rdd.multiplex.MuxRDDFunctions
Некоторые методы, характерные для случайного расщепления:
http://silex.freevariable.com/latest/api/#com.redhat.et.silex.sample.split.SplitSampleRDDFunctions
Способы доступны из проекта silex с открытым исходным кодом:
https://github.com/willb/silex
Сообщение в блоге, объясняющее, как они работают:
http://erikerlandson.github.io/blog/2016/02/08/efficient-multiplexing-for-spark-rdds/
def muxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[U],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => Iterator.single(itr.next()(j)) } }
}
def flatMuxPartitions[U :ClassTag](n: Int, f: (Int, Iterator[T]) => Seq[TraversableOnce[U]],
persist: StorageLevel): Seq[RDD[U]] = {
val mux = self.mapPartitionsWithIndex { case (id, itr) =>
Iterator.single(f(id, itr))
}.persist(persist)
Vector.tabulate(n) { j => mux.mapPartitions { itr => itr.next()(j).toIterator } }
}
Как упоминалось в других источниках, эти методы включают в себя компромисс памяти для скорости, потому что они работают, вычисляя целые результаты разделов "нетерпеливо", а не "лениво". Таким образом, эти методы могут возникать в задачах памяти на больших разделах, где более традиционные ленивые преобразования не будут.
Ответ 3
Если вы разделите RDD с помощью randomSplit API, вы получите массив RDD.
Если вы хотите вернуть 5 RDD, переходите к 5 значениям веса.
например.
val sourceRDD = val sourceRDD = sc.parallelize(1 to 100, 4)
val seedValue = 5
val splitRDD = sourceRDD.randomSplit(Array(1.0,1.0,1.0,1.0,1.0), seedValue)
splitRDD(1).collect()
res7: Array[Int] = Array(1, 6, 11, 12, 20, 29, 40, 62, 64, 75, 77, 83, 94, 96, 100)
Ответ 4
Один из способов - использовать пользовательский разделитель для разделения данных в зависимости от вашего условия фильтра. Это может быть достигнуто путем расширения Partitioner
и реализации чего-то похожего на RangePartitioner
.
Затем можно разбивать разбиения на карты для создания нескольких RDD из секционированного RDD без чтения всех данных.
val filtered = partitioned.mapPartitions { iter => {
new Iterator[Int](){
override def hasNext: Boolean = {
if(rangeOfPartitionsToKeep.contains(TaskContext.get().partitionId)) {
false
} else {
iter.hasNext
}
}
override def next():Int = iter.next()
}
Просто имейте в виду, что количество разделов в отфильтрованных RDD будет таким же, как число в секционированном RDD, поэтому для уменьшения этого и сокращения пустых разделов следует использовать коалесценцию.