Параллельный итератор в Scala
Возможно ли, используя Scala параллельные коллекции, чтобы распараллелить Iterator
без предварительной оценки?
Здесь я говорю о параллелизации функциональных преобразований на Iterator
, а именно map
и flatMap
.
Я думаю, что это требует предварительной оценки некоторых элементов Iterator
, а затем вычисления большего количества, как только некоторые из них будут потребляться через next
.
Все, что я мог найти, потребует, чтобы итератор был преобразован в Iterable
или Stream
в лучшем случае. Stream
затем получает полную оценку, когда я вызываю .par
на нем.
Я также приветствую предложения по реализации, если это не доступно. Реализации должны поддерживать параллельные map
и flatMap
.
Ответы
Ответ 1
Я понимаю, что это старый вопрос, но реализация ParIterator
в iterata библиотека делает то, что вы искали?
scala> import com.timgroup.iterata.ParIterator.Implicits._
scala> val it = (1 to 100000).toIterator.par().map(n => (n + 1, Thread.currentThread.getId))
scala> it.map(_._2).toSet.size
res2: Int = 8 // addition was distributed over 8 threads
Ответ 2
Лучше всего использовать стандартную библиотеку, возможно, не используя параллельные коллекции, но concurrent.Future.traverse
:
import concurrent._
import ExecutionContext.Implicits.global
Future.traverse(Iterator(1,2,3))(i => Future{ i*i })
хотя я думаю, что это выполнит все, начиная с самого начала.
Ответ 3
От ML, перемещая элементы итератора параллельно:
https://groups.google.com/d/msg/scala-user/q2NVdE6MAGE/KnutOq3iT3IJ
Я отошел по Future.traverse
по той же причине. В моем случае использования, поддерживая N рабочих мест, я получил код для дросселя, подающего контекст выполнения из очереди заданий.
Моя первая попытка заключалась в блокировании потока фидера, но это рисковало также блокировать задачи, которые хотели вызвать задачи в контексте выполнения. Что вы знаете, блокирование - это зло.
Ответ 4
Немного сложно точно следить за тем, что вы после, но, возможно, это примерно так:
val f = (x: Int) => x + 1
val s = (0 to 9).toStream map f splitAt(6) match {
case (left, right) => left.par; right
}
Это будет определять f на первых 6 элементах параллельно, а затем возвращать поток поверх остальных.