Параллельные операции над коллекциями Котлина?

В Scala можно легко выполнить параллельное отображение, для каждого и т.д., С помощью:

collection.par.map(..)

Есть ли эквивалент в Котлине?

Ответы

Ответ 1

Стандартная библиотека Kotlin не поддерживает параллельные операции. Однако, поскольку Kotlin использует стандартные классы коллекций Java, вы можете использовать потоковый API Java 8 для выполнения параллельных операций с коллекциями Kotlin.

например

myCollection.parallelStream()
        .map { ... }
        .filter { ... }

Ответ 2

Начиная с Kotlin 1.1, параллельные операции также можно выразить довольно элегантно с точки зрения сопрограмм. Вот pmap в списках:

fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
    map { async(CommonPool) { f(it) } }.map { it.await() }
}

Обратите внимание, что сопрограммы все еще являются экспериментальной функцией.

Ответ 3

Официальной поддержки в Kotlin stdlib пока нет, но вы можете определить функцию расширения для имитации par.map:

fun <T, R> Iterable<T>.pmap(
          numThreads: Int = Runtime.getRuntime().availableProcessors() - 2, 
          exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
          transform: (T) -> R): List<R> {

    // default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
    val defaultSize = if (this is Collection<*>) this.size else 10
    val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))

    for (item in this) {
        exec.submit { destination.add(transform(item)) }
    }

    exec.shutdown()
    exec.awaitTermination(1, TimeUnit.DAYS)

    return ArrayList<R>(destination)
}

(источник GitHub)

Вот простой пример использования

val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }

При необходимости это позволяет настроить многопоточность, указав количество потоков или даже определенный java.util.concurrent.Executor. Например

listOf("foo", "bar").pmap(4, transform = { it + "!" })

Обратите внимание, что этот подход просто позволяет распараллелить операцию map и не влияет на биты нисходящего потока. Например, filter в первом примере будет работать однопоточным. Однако во многих случаях только преобразование данных (т. map) требует распараллеливания. Кроме того, было бы просто распространить подход сверху на другие элементы API коллекции Kotlin.

Ответ 4

Начиная с версии 1.2, kotlin добавил функцию потока, совместимую с JRE8.

Таким образом, итерация по списку асинхронно может быть выполнена следующим образом:

fun main(args: Array<String>) {
  val c = listOf("toto", "tata", "tutu")
  c.parallelStream().forEach { println(it) }
}

Ответ 5

В настоящий момент нет. Официальное сравнение Kotlin с Scala упоминает:

Вещи, которые могут быть добавлены в Kotlin позже:

  • Параллельные коллекции

Ответ 6

Котлин хочет быть идиоматичным, но не слишком искусственным, чтобы его было трудно понять с первого взгляда.

Параллельные вычисления через Coroutines не являются исключением. Они хотят, чтобы это было легко, но не подразумевалось с помощью некоторого предварительно созданного метода, позволяющего выполнять вычисления при необходимости.

В твоем случае:

collection.map { 
        async{ produceWith(it) } 
    }
    .forEach { 
        consume(it.await()) 
    }

Обратите внимание, что для вызова async и await вы должны находиться внутри так называемого Context, вы не можете делать приостановленные вызовы или запускать сопрограмму из не-сопрограммного контекста. Чтобы ввести один, вы можете:

  • runBlocking {/* your code here */}: он приостановит текущий поток, пока не вернется лямбда- runBlocking {/* your code here */}.
  • GlobalScope.launch { }: он будет выполнять лямбду параллельно; если ваш main закончит работу, а у ваших сопрограмм не будет плохих вещей, в этом случае лучше использовать runBlocking.

Надеюсь, что это может помочь :)