Преобразование Slick Streaming данных и отправка Chunked Response с использованием Akka Http
Цель состоит в том, чтобы передавать данные из базы данных, выполнять некоторые вычисления на этом фрагменте данных (это вычисление возвращает Будущее некоторого класса case) и отправлять эти данные как ответные запросы пользователю. В настоящее время я могу передавать данные и отправлять ответ без каких-либо вычислений. Однако я не могу выполнить это вычисление, а затем передать результат.
Это маршрут, который я реализовал.
def streamingDB1 =
path("streaming-db1") {
get {
val src = Source.fromPublisher(db.stream(getRds))
complete(src)
}
}
Функция getRds возвращает строки таблицы, отображаемой в класс case (Use slick). Теперь рассмотрим вычисление функции, которое берет каждую строку в качестве входного и возвращает будущее другого класса case. Что-то вроде
def compute(x: Tweet) : Future[TweetNew] = ?
Как я могу реализовать эту функцию в переменной src и отправить пользователю запрос (как поток) этого вычисления в chunked response.
Ответы
Ответ 1
Вы можете преобразовать источник, используя scala.concurrent.Future[T]):FlowOps.this.Repr[T]#mapAsync[T](parallelism:Int)(f:Out=>scala.concurrent.Future[T]):FlowOps.this.Repr[T] rel="nofollow noreferrer"> mapAsync
:
val src =
Source.fromPublisher(db.stream(getRds))
.mapAsync(parallelism = 3)(compute)
complete(src)
При необходимости отрегулируйте уровень параллелизма.
Обратите внимание, что вам может потребоваться настроить несколько параметров, как указано в документации Slick:
Примечание. В некоторых системах баз данных могут потребоваться определенные параметры сеанса для поддержки потоковой передачи без кэширования всех данных сразу в памяти на стороне клиента. Например, PostgreSQL требует как .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)
(с требуемым размером страницы n
) и .transactionally
для правильной потоковой передачи.
Например, если вы используете PostgreSQL, то ваш Source
может выглядеть примерно так:
val src =
Source.fromPublisher(
db.stream(
getRds.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10
).transactionally
)
).mapAsync(parallelism = 3)(compute)
Ответ 2
Вам нужно иметь способ сортировать TweetNew, а также если вы отправляете кусок длиной 0, клиент может закрыть соединение.
Этот код работает с завихрением:
case class TweetNew(str: String)
def compute(string: String) : Future[TweetNew] = Future {
TweetNew(string)
}
val route = path("hello") {
get {
val byteString: Source[ByteString, NotUsed] = Source.apply(List("t1", "t2", "t3"))
.mapAsync(2)(compute)
.map(tweet => ByteString(tweet.str + "\n"))
complete(HttpEntity(ContentTypes.'text/plain(UTF-8)', byteString))
}
}