Параллельная карта /foreach в scala

У меня есть итерация vals: Iterable[T] и долговременная функция без каких-либо соответствующих побочных эффектов: f: (T => Unit). Сейчас это применяется к vals очевидным образом:

vals.foreach(f)

Я бы хотел, чтобы вызовы f выполнялись одновременно (в разумных пределах). Есть ли очевидная функция где-то в базовой библиотеке Scala? Что-то вроде:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)

В то время как f достаточно долго работает, он достаточно короткий, что я не хочу накладных расходов на вызов потока для каждого вызова, поэтому я ищу что-то на основе пула потоков.

Ответы

Ответ 1

Мне нравится ответ Futures. Однако, пока он будет выполняться одновременно, он также будет возвращаться асинхронно, что, вероятно, не является тем, что вы хотите. Правильный подход будет следующим:

import scala.actors.Futures._

vals map { x => future { f(x) } } foreach { _() }

Ответ 2

Scalaz имеет parMap. Вы использовали бы его следующим образом:

import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive

Это будет оснащать каждый функтор (включая Iterable) с помощью метода parMap, поэтому вы можете просто сделать:

vals.parMap(f)

Вы также получаете parFlatMap, parZipWith и т.д.

Ответ 3

Многие ответы 2009 года по-прежнему используют старые scala.actors.Futures._, которые больше не находятся в более новом scala. Хотя Akka является предпочтительным способом, гораздо более читаемым способом является просто использовать параллельные коллекции (.par):

vals.foreach { v => f(v) }

становится

vals.par.foreach { v => f(v) }

В качестве альтернативы использование parMap может показаться более кратким, хотя с предостережением, которое вам нужно запомнить, чтобы импортировать обычный Scalaz *. Как обычно, существует более одного способа сделать то же самое в Scala!

Ответ 4

У меня возникли проблемы с использованием scala.actors.Futures в Scala 2.8 (при проверке я ошибся). Использование java libs напрямую работало для меня:

final object Parallel {
  val cpus=java.lang.Runtime.getRuntime().availableProcessors
  import java.util.{Timer,TimerTask}
  def afterDelay(ms: Long)(op: =>Unit) = new Timer().schedule(new TimerTask {override def run = op},ms)
  def repeat(n: Int,f: Int=>Unit) = {
    import java.util.concurrent._
    val e=Executors.newCachedThreadPool //newFixedThreadPool(cpus+1)
    (0 until n).foreach(i=>e.execute(new Runnable {def run = f(i)}))
    e.shutdown
    e.awaitTermination(Math.MAX_LONG, TimeUnit.SECONDS)
  }
}

Ответ 5

Я бы использовал scala.actors.Futures:

vals.foreach(t => scala.actors.Futures.future(f(t)))

Ответ 6

В последней версии Functional Java есть некоторые функции concurrency более высокого порядка, которые вы можете использовать.

import fjs.F._
import fj.control.parallel.Strategy._
import fj.control.parallel.ParModule._
import java.util.concurrent.Executors._

val pool = newCachedThreadPool
val par = parModule(executorStrategy[Unit](pool))

И затем...

par.parMap(vals, f)

Помните shutdown pool.

Ответ 7

Вы можете использовать Parallel Collections из стандартной библиотеки Scala. Они похожи на обычные коллекции, но их операции выполняются параллельно. Вам просто нужно поставить вызов par, прежде чем вы вызовете некоторые операции с коллекциями.

import scala.collection._

val array = new Array[String](10000)
for (i <- (0 until 10000).par) array(i) = i.toString