Параллельные операции над коллекциями Котлина?
В Scala можно легко выполнить параллельное отображение, для каждого и т.д., С помощью:
collection.par.map(..)
Есть ли эквивалент в Котлине?
Ответы
Ответ 1
Стандартная библиотека Kotlin не поддерживает параллельные операции. Однако, поскольку Kotlin использует стандартные классы коллекций Java, вы можете использовать потоковый API Java 8 для выполнения параллельных операций с коллекциями Kotlin.
например
myCollection.parallelStream()
.map { ... }
.filter { ... }
Ответ 2
Начиная с Kotlin 1.1, параллельные операции также можно выразить довольно элегантно с точки зрения сопрограмм. Вот pmap
в списках:
fun <A, B>List<A>.pmap(f: suspend (A) -> B): List<B> = runBlocking {
map { async(CommonPool) { f(it) } }.map { it.await() }
}
Обратите внимание, что сопрограммы все еще являются экспериментальной функцией.
Ответ 3
Официальной поддержки в Kotlin stdlib пока нет, но вы можете определить функцию расширения для имитации par.map
:
fun <T, R> Iterable<T>.pmap(
numThreads: Int = Runtime.getRuntime().availableProcessors() - 2,
exec: ExecutorService = Executors.newFixedThreadPool(numThreads),
transform: (T) -> R): List<R> {
// default size is just an inlined version of kotlin.collections.collectionSizeOrDefault
val defaultSize = if (this is Collection<*>) this.size else 10
val destination = Collections.synchronizedList(ArrayList<R>(defaultSize))
for (item in this) {
exec.submit { destination.add(transform(item)) }
}
exec.shutdown()
exec.awaitTermination(1, TimeUnit.DAYS)
return ArrayList<R>(destination)
}
(источник GitHub)
Вот простой пример использования
val result = listOf("foo", "bar").pmap { it+"!" }.filter { it.contains("bar") }
При необходимости это позволяет настроить многопоточность, указав количество потоков или даже определенный java.util.concurrent.Executor
. Например
listOf("foo", "bar").pmap(4, transform = { it + "!" })
Обратите внимание, что этот подход просто позволяет распараллелить операцию map
и не влияет на биты нисходящего потока. Например, filter
в первом примере будет работать однопоточным. Однако во многих случаях только преобразование данных (т. map
) требует распараллеливания. Кроме того, было бы просто распространить подход сверху на другие элементы API коллекции Kotlin.
Ответ 4
Начиная с версии 1.2, kotlin добавил функцию потока, совместимую с JRE8.
Таким образом, итерация по списку асинхронно может быть выполнена следующим образом:
fun main(args: Array<String>) {
val c = listOf("toto", "tata", "tutu")
c.parallelStream().forEach { println(it) }
}
Ответ 5
В настоящий момент нет. Официальное сравнение Kotlin с Scala упоминает:
Вещи, которые могут быть добавлены в Kotlin позже:
Ответ 6
Котлин хочет быть идиоматичным, но не слишком искусственным, чтобы его было трудно понять с первого взгляда.
Параллельные вычисления через Coroutines не являются исключением. Они хотят, чтобы это было легко, но не подразумевалось с помощью некоторого предварительно созданного метода, позволяющего выполнять вычисления при необходимости.
В твоем случае:
collection.map {
async{ produceWith(it) }
}
.forEach {
consume(it.await())
}
Обратите внимание, что для вызова async
и await
вы должны находиться внутри так называемого Context
, вы не можете делать приостановленные вызовы или запускать сопрограмму из не-сопрограммного контекста. Чтобы ввести один, вы можете:
-
runBlocking {/* your code here */}
: он приостановит текущий поток, пока не вернется лямбда- runBlocking {/* your code here */}
. -
GlobalScope.launch { }
: он будет выполнять лямбду параллельно; если ваш main
закончит работу, а у ваших сопрограмм не будет плохих вещей, в этом случае лучше использовать runBlocking
.
Надеюсь, что это может помочь :)