Правильное использование пулов подключения клиентов "Akka http"
Мне нужно использовать REST-сервис с использованием Akka HTTP-клиента (v2.0.2). Логический подход заключается в том, чтобы сделать это через пул соединений хоста, потому что мы ожидаем большое количество одновременных соединений. Flow
для этого потребляет (HttpRequest, T)
и возвращает (Try[HttpResponse, T)
. Документация указывает, что некоторый произвольный тип T
необходим для управления потенциальными неупорядоченными ответами на запросы, но не указывает, что должен делать вызывающий объект с возвращенным T
Моя первая попытка - функция ниже, используя Int
как T
Он вызывается из многих мест, чтобы гарантировать, что соединения используют один пул.
val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))
def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
val unique = Random.nextInt
Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), 'unique') ⇒ Future.successful(r)
case (Failure(f), 'unique') ⇒ Future.failed(f)
case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
}
}
Вопрос в том, как клиент должен использовать этот T
? Есть ли более чистое, более эффективное решение? И, наконец, моя паранойя, что что-то может выйти из строя, на самом деле не паранойя?
Ответы
Ответ 1
Я был немного смущен этим сам изначально, пока не прочитал несколько документов. Если вы собираетесь использовать одиночные запросы в пул, независимо от того, сколько разных мест использует тот же пул, T
, который вы поставляете (Int
в вашем случае), не имеет значения. Поэтому, если вы все время используете Source.single
, этот ключ всегда может быть 1
, если вы действительно этого хотите.
Если он действительно вступает в игру, это если часть кода собирается использовать пул и одновременно отправлять несколько запросов в пул и хочет получить ответы от всех этих запросов. Причина в том, что ответы возвращаются в том порядке, в котором они были получены от вызванной службы, а не в порядке, в котором они были отправлены в пул. Каждый запрос может занимать разные промежутки времени, поэтому они перемещаются вниз по течению к Sink
в том порядке, в котором они были получены обратно из пула.
Скажем, у нас была служба, которая принимала запросы GET
с URL-адресом в форме:
/product/123
Где часть 123
- это идентификатор продукта, который вы искали. Если бы я хотел искать продукты 1-10
все сразу, с отдельным запросом для каждого, это то, где идентификатор становится важным, поэтому я могу сопоставить каждый HttpResponse
с идентификатором продукта, для которого он предназначен. Пример упрощенного кода для этого сценария будет следующим:
val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] =
Source(requests).
via(pool).
runFold(Map.empty[Int,HttpResponse]){
case (m, (util.Success(resp), id)) =>
m ++ Map(id -> resp)
case (m, (util.Failure(ex), i)) =>
//Log a failure here probably
m
}
Когда я получаю ответы в fold
, у меня также есть идентификатор, с которым каждый связан, поэтому я могу добавить их в мой Map
, который вводится с помощью id. Без этой функциональности я, вероятно, должен был бы сделать что-то вроде синтаксического анализа тела (если это был json), чтобы попытаться выяснить, какой ответ был тем, что и что не идеально, и это не охватывает случай сбоя. В этом решении я знаю, какие запросы не удались, потому что я все еще вернул идентификатор.
Я надеюсь, что это немного разъяснит вам.
Ответ 2
Пулы соединений Akka HTTP являются мощными союзниками при потреблении ресурсов на основе HTTP. Если вы собираетесь выполнять одиночные запросы одновременно, то решение:
def exec(req: HttpRequest): Future[HttpResponse] = {
Source.single(req → 1)
.via(pool)
.runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), _) ⇒ Future.successful(r)
case (Failure(f), _) ⇒ Future.failed(f)
}
}
Поскольку вы выполняете запрос single
, нет необходимости устранять неоднозначность ответа. Однако потоки Акки умны. Одновременно вы можете отправлять несколько запросов в пул. В этом случае мы передаем Iterable[HttpRequest]
. Возвращаемый Iterable[HttpResponse]
переупорядочивается с помощью SortedMap
в том же порядке, что и исходные запросы. Вы можете просто сделать request zip response
, чтобы выровнять строки:
def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.map(r ⇒ r.values)
}
Фьючерсы итерируемых фьючерсов великолепны, если вам нужно распаковать вещи по-своему. Более простой ответ можно получить, просто сглаживая вещи.
def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.flatMap(r ⇒ Future.sequence(r.values))
}
Я сделал этот смысл со всеми импортерами и обертками, чтобы клиент мог использовать HTTP-сервисы.
Особая благодарность @cmbaxter за его опрятный пример.
Ответ 3
Существует открытый билет для улучшения документации по akka-http об этом. пожалуйста
проверьте этот пример
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
.via(pool)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise
val response = queue.offer(request).flatMap(buffered => {
if (buffered) promise.future
else Future.failed(new RuntimeException())
})
Ответ 4
Это действительно полезная статья о решениях и использовании Akka-Http Clients.
https://www.gregbeech.com/2018/04/08/akka-http-client-pooling-and-parallelism/