Параллельная карта /foreach в scala
У меня есть итерация vals: Iterable[T]
и долговременная функция без каких-либо соответствующих побочных эффектов: f: (T => Unit)
. Сейчас это применяется к vals
очевидным образом:
vals.foreach(f)
Я бы хотел, чтобы вызовы f
выполнялись одновременно (в разумных пределах). Есть ли очевидная функция где-то в базовой библиотеке Scala? Что-то вроде:
Concurrent.foreach(8 /* Number of threads. */)(vals, f)
В то время как f
достаточно долго работает, он достаточно короткий, что я не хочу накладных расходов на вызов потока для каждого вызова, поэтому я ищу что-то на основе пула потоков.
Ответы
Ответ 1
Мне нравится ответ Futures
. Однако, пока он будет выполняться одновременно, он также будет возвращаться асинхронно, что, вероятно, не является тем, что вы хотите. Правильный подход будет следующим:
import scala.actors.Futures._
vals map { x => future { f(x) } } foreach { _() }
Ответ 2
Scalaz имеет parMap
. Вы использовали бы его следующим образом:
import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive
Это будет оснащать каждый функтор (включая Iterable
) с помощью метода parMap
, поэтому вы можете просто сделать:
vals.parMap(f)
Вы также получаете parFlatMap
, parZipWith
и т.д.
Ответ 3
Многие ответы 2009 года по-прежнему используют старые scala.actors.Futures._, которые больше не находятся в более новом scala. Хотя Akka является предпочтительным способом, гораздо более читаемым способом является просто использовать параллельные коллекции (.par):
vals.foreach { v => f(v) }
становится
vals.par.foreach { v => f(v) }
В качестве альтернативы использование parMap может показаться более кратким, хотя с предостережением, которое вам нужно запомнить, чтобы импортировать обычный Scalaz *. Как обычно, существует более одного способа сделать то же самое в Scala!
Ответ 4
У меня возникли проблемы с использованием scala.actors.Futures в Scala 2.8 (при проверке я ошибся). Использование java libs напрямую работало для меня:
final object Parallel {
val cpus=java.lang.Runtime.getRuntime().availableProcessors
import java.util.{Timer,TimerTask}
def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
def repeat(n: Int,f: Int=>Unit) = {
import java.util.concurrent._
val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
(0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
e.shutdown
e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
}
}
Ответ 5
Я бы использовал scala.actors.Futures
:
vals.foreach(t => scala.actors.Futures.future(f(t)))
Ответ 6
В последней версии Functional Java есть некоторые функции concurrency более высокого порядка, которые вы можете использовать.
import fjs.F._
import fj.control.parallel.Strategy._
import fj.control.parallel.ParModule._
import java.util.concurrent.Executors._
val pool = newCachedThreadPool
val par = parModule(executorStrategy[Unit](pool))
И затем...
par.parMap(vals, f)
Помните shutdown
pool
.
Ответ 7
Вы можете использовать Parallel Collections из стандартной библиотеки Scala.
Они похожи на обычные коллекции, но их операции выполняются параллельно. Вам просто нужно поставить вызов par
, прежде чем вы вызовете некоторые операции с коллекциями.
import scala.collection._
val array = new Array[String](10000)
for (i <- (0 until 10000).par) array(i) = i.toString