Отображение потока с функцией, возвращающей будущее
Я иногда оказываюсь в ситуации, когда у меня есть несколько Stream[X]
и function X => Future Y
, которые я хотел бы объединить с Future[Stream[Y]]
, и я не могу найти способ сделать это, Например, у меня есть
val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)
val result : Future[Stream[String]] = ???
Я пробовал
val result = Future.Traverse(x, toFutureString)
который дает правильный результат, но, кажется, поглощает весь поток перед возвращением Будущего, которое более или менее поражает пупс
Я пробовал
val result = x.flatMap(toFutureString)
но не компилируется с type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]
val result = x.map(toFutureString)
возвращает несколько нечетный и бесполезный Stream[Future[String]]
Что мне делать, чтобы все исправлено?
Изменить: я не застрял на Stream
, я был бы в равной степени счастлив с той же операцией на Iterator
, если он не будет блокировать оценку всех элементов, прежде чем приступать к обработке головы
Edit2: я не уверен на 100%, что конструкция Future.Traverse должна пересекать весь поток, прежде чем возвращать Future [Stream], но я думаю, что это так. Если это не так, это прекрасный ответ сам по себе.
Edit3: Мне тоже не нужно, чтобы результат был в порядке, я в порядке с потоком или итератором, возвращающимся в любой порядок.
Ответы
Ответ 1
Вы находитесь на правильном пути с traverse
, но, к сожалению, похоже, что стандартное определение библиотеки немного сломано в этом случае - ему не нужно будет потреблять поток перед возвратом.
Future.traverse
представляет собой конкретную версию гораздо более общей функции, которая работает на любом прикладном функторе, обернутом в "обходной" тип (см. эти papers или мой ответ здесь для получения дополнительной информации, например).
Библиотека Scalaz предоставляет эту более общую версию, и в этом случае она работает как ожидалось (обратите внимание, что я получаю экземпляр прикладного функтора для Future
из scalaz-contrib
, он еще не находится в стабильных версиях Scalaz, которые по-прежнему кросс-построены против Scala 2.9.2, который не имеет этого Future
):
import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._
import ExecutionContext.Implicits.global
def toFutureString(value: Int) = Future(value.toString)
val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString
Это немедленно возвращается к бесконечному потоку, поэтому мы точно знаем, что он не потребляет в первую очередь.
Как сноска: если вы посмотрите источник для Future.traverse
, вы увидите, что он реализован в терминах foldLeft
, что удобно, но не обязательно или целесообразно в случае потоков.
Ответ 2
Забывание о потоке:
import scala.concurrent.Future
import ExecutionContext.Implicits.global
val x = 1 to 10 toList
def toFutureString(value : Int) = Future {
println("starting " + value)
Thread.sleep(1000)
println("completed " + value)
value.toString
}
дает (на моем 8-ядерном ящике):
scala> Future.traverse(x)(toFutureString)
starting 1
starting 2
starting 3
starting 4
starting 5
starting 6
starting 7
starting 8
res12: scala.concurrent.Future[List[String]] = [email protected]
scala> completed 1
completed 2
starting 9
starting 10
completed 3
completed 4
completed 5
completed 6
completed 7
completed 8
completed 9
completed 10
Итак, 8 из них сразу же запускаются (по одному для каждого ядра, хотя это настраивается с помощью исполнителя threadpool), а затем, когда все больше завершаются. The Future [List [String]] немедленно возвращается, а затем после паузы начинает печатать эти сообщения "завершено x".
Пример использования этого может быть, когда у вас есть List [Url's] и функция типа Url = > Future [HttpResponseBody]. Вы можете вызвать Future.traverse в этом списке с помощью этой функции и параллельно запускать эти HTTP-запросы, возвращая единственное будущее, которое представляет список результатов.
Было что-то вроде того, что вы делали?
Ответ 3
Принятый ответ больше не действителен, так как современная версия Scalaz traverse()
ведет себя по-другому и пытается использовать весь поток во время вызова.
Что касается вопроса, я бы сказал, что добиться этого по-настоящему неблокирующим способом невозможно.
Future[Stream[Y]]
не может быть разрешено, пока не будет доступен Stream[Y]
. А так как Y
генерируется асинхронно функцией X => Future[Y]
вы не можете получить Y
не блокируя время, когда вы пересекаете Stream[Y]
. Это означает, что либо все Future[Y]
должны быть разрешены до разрешения Future[Stream[Y]]
(для которого требуется использование всего потока), либо вы должны разрешить появление блоков при прохождении Stream[Y]
(для элементов, базовое будущее которых) еще не завершены). Но если мы допустим блокировку обхода, каково будет определение итогового будущего? С этой точки зрения это может быть то же самое, что и Future.successful(BlockingStream[Y])
. Это, в свою очередь, семантически равно оригинальному Stream[Future[Y]]
.
Другими словами, я думаю, что есть проблема в самом вопросе.