Разница между картой и mapAsync

Может ли кто-нибудь объяснить мне разницу между map и mapAsync w.r.t потоком AKKA? В документации говорится, что

Преобразования потоков и побочные эффекты, связанные с внешним потоком основанные на услугах, могут выполняться с помощью mapAsync или mapAsyncUnordered

Почему мы не можем просто отобразить здесь карту? Я предполагаю, что поток, источник, раковина все будет монадическим по своей природе и, следовательно, карта должна работать нормально w.r.t Задержка в природе этих?

Ответы

Ответ 1

Подпись

Разница лучше всего выделяется в подписях: Flow.map принимает функцию, возвращающую тип T, а Flow.mapAsync принимает в функции, которая возвращает тип Future[T].

Практический пример

В качестве примера предположим, что у нас есть функция, которая запрашивает базу данных для полного имени пользователя на основе идентификатора пользователя:

type UserID = String
type FullName = String

val databaseLookup : UserID => FullName = ???  //implementation unimportant

Учитывая значения Source of UserID, мы могли бы просто использовать Flow.map в потоке для запроса базы данных и печати полных имен на консоли:

val userIDSource : Source[UserID, _] = ???

val stream = 
  userIDSource.via(Flow[UserID].map(databaseLookup))
              .to(Sink.foreach[FullName](println))
              .run()

Одно из ограничений этой реализации заключается в том, что этот поток будет обрабатывать только 1 бит запроса за раз. Это будет "узким местом" и, вероятно, будет препятствовать максимальной пропускной способности в нашем потоке. Чтобы повысить производительность, мы могли просто добавить concurrency, обернув databaseLookup внутри Future:

def concurrentDBLookup(userID : UserID) : Future[FullName] = 
  Future { databaseLookup(userID) }

val concurrentStream = 
  userIDSource.via(Flow[UserID].map(concurrentDBLookup))
              .to(Sink.foreach[Future[FullName]](_ foreach println))
              .run()

Проблема с этим упрощенным дополнением concurrency заключается в том, что мы эффективно устранили противодавление. Поскольку Sink просто втягивает будущее и добавляет foreach println, который относительно быстр по сравнению с запросами базы данных, поток будет непрерывно распространять спрос на источник и вызывать больше фьючерсов. Это означает, что количество операций databaseLookup не ограничено, что может в конечном итоге залить базу данных.

Flow.mapAsync к спасению; мы можем иметь одновременный поиск db и одновременно ограничивать количество одновременных запросов:

val maxLookupCount = 10

val maxLookupConcurrentStream = 
  userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
              .to(Sink.foreach[FullName](println))
              .run()

Unordered Async Map

Если вы не заботитесь о том, чтобы упорядочить идентификаторы UserID на FullNames, вы можете использовать Flow.mapAsyncUnordered. Это было бы полезно, если бы все, о чем вы заботились, это распечатать все полные имена, но не заботился о том, какой заказ они принесли на консоль.