Ответ 1
Подпись
Разница лучше всего выделяется в подписях: Flow.map
принимает функцию, возвращающую тип T
, а Flow.mapAsync
принимает в функции, которая возвращает тип Future[T]
.
Практический пример
В качестве примера предположим, что у нас есть функция, которая запрашивает базу данных для полного имени пользователя на основе идентификатора пользователя:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
Учитывая значения Source
of UserID
, мы могли бы просто использовать Flow.map
в потоке для запроса базы данных и печати полных имен на консоли:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
Одно из ограничений этой реализации заключается в том, что этот поток будет обрабатывать только 1 бит запроса за раз. Это будет "узким местом" и, вероятно, будет препятствовать максимальной пропускной способности в нашем потоке. Чтобы повысить производительность, мы могли просто добавить concurrency, обернув databaseLookup
внутри Future
:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
Проблема с этим упрощенным дополнением concurrency заключается в том, что мы эффективно устранили противодавление. Поскольку Sink просто втягивает будущее и добавляет foreach println
, который относительно быстр по сравнению с запросами базы данных, поток будет непрерывно распространять спрос на источник и вызывать больше фьючерсов. Это означает, что количество операций databaseLookup
не ограничено, что может в конечном итоге залить базу данных.
Flow.mapAsync
к спасению; мы можем иметь одновременный поиск db и одновременно ограничивать количество одновременных запросов:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
Unordered Async Map
Если вы не заботитесь о том, чтобы упорядочить идентификаторы UserID на FullNames, вы можете использовать Flow.mapAsyncUnordered
. Это было бы полезно, если бы все, о чем вы заботились, это распечатать все полные имена, но не заботился о том, какой заказ они принесли на консоль.