Как создать поток akka-stream из потока, который генерирует значения рекурсивно?
Мне нужно пересечь API, который имеет форму дерева. Например, структура каталогов или темы обсуждения. Его можно смоделировать с помощью следующего потока:
type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])
def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString
// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
if (id == 0) (1 to 9).toList
else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
else Nil
val itemFlow: Flow[ItemId, Item, NotUsed] =
Flow.fromFunction(id => Item(randomData, nested(id)))
Как я могу пройти эти данные? Я получил следующую работу:
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val loop =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val source = b.add(Flow[Int])
val merge = b.add(Merge[Int](2))
val fetch = b.add(itemFlow)
val bcast = b.add(Broadcast[Item](2))
val kids = b.add(Flow[Item].mapConcat(_.kids))
val data = b.add(Flow[Item].map(_.data))
val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)
source ~> merge ~> fetch ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
FlowShape(source.in, data.out)
}
val flow = Flow.fromGraph(loop)
Await.result(
Source.single(0).via(flow).runWith(Sink.foreach(println)),
Duration.Inf
)
system.terminate()
Однако, поскольку я использую поток с буфером, Stream никогда не завершится.
Завершается, когда восходящий поток завершен, и буферизированные элементы слиты
Flow.buffer
Я несколько раз читал раздел " График", "Живая" и "Тупики ", и я все еще пытаюсь найти ответ.
Это создаст живой замок:
import java.util.concurrent.atomic.AtomicInteger
def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
// keep track of how many element flows,
val remaining = new AtomicInteger(1) // 1 = seed
// should be > max loop(x)
val bufferSize = 10000
val (ref, publisher) =
Source.actorRef[S](bufferSize, OverflowStrategy.fail)
.toMat(Sink.asPublisher(true))(Keep.both)
.run()
ref ! seed
Source.fromPublisher(publisher)
.via(flow)
.map{x =>
loop(x).foreach{ c =>
remaining.incrementAndGet()
ref ! c
}
x
}
.takeWhile(_ => remaining.decrementAndGet > 0)
}
EDIT: я добавил git repo для проверки вашего решения https://github.com/MasseGuillaume/source-unfold
Ответы
Ответ 1
Я решил эту проблему, написав собственный GraphStage.
import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}
import scala.concurrent.ExecutionContext
import scala.collection.mutable
import scala.util.{Success, Failure, Try}
import scala.collection.mutable
def unfoldTree[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = {
Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))
}
object UnfoldSource {
implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal {
def dequeueN(n: Int): List[A] = {
val b = List.newBuilder[A]
var i = 0
while (i < n) {
val e = self.dequeue
b += e
i += 1
}
b.result()
}
}
}
class UnfoldSource[S, E](seeds: List[S],
flow: Flow[S, E, NotUsed],
loop: E => List[S],
bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldSource.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
// Nodes to expand
val frontier = mutable.Queue[S]()
frontier ++= seeds
// Nodes expanded
val buffer = mutable.Queue[E]()
// Using the flow to fetch more data
var inFlight = false
// Sink pulled but the buffer was empty
var downstreamWaiting = false
def isBufferFull() = buffer.size >= bufferSize
def fillBuffer(): Unit = {
val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
val batch = frontier.dequeueN(batchSize)
inFlight = true
val toProcess =
Source(batch)
.via(flow)
.runWith(Sink.seq)(materializer)
val callback = getAsyncCallback[Try[Seq[E]]]{
case Failure(ex) => {
fail(out, ex)
}
case Success(es) => {
val got = es.size
inFlight = false
es.foreach{ e =>
buffer += e
frontier ++= loop(e)
}
if (downstreamWaiting && buffer.nonEmpty) {
val e = buffer.dequeue
downstreamWaiting = false
sendOne(e)
} else {
checkCompletion()
}
()
}
}
toProcess.onComplete(callback.invoke)
}
override def preStart(): Unit = {
checkCompletion()
}
def checkCompletion(): Unit = {
if (!inFlight && buffer.isEmpty && frontier.isEmpty) {
completeStage()
}
}
def sendOne(e: E): Unit = {
push(out, e)
checkCompletion()
}
def onPull(): Unit = {
if (buffer.nonEmpty) {
sendOne(buffer.dequeue)
} else {
downstreamWaiting = true
}
if (!isBufferFull && frontier.nonEmpty) {
fillBuffer()
}
}
setHandler(out, this)
}
}
Ответ 2
Причина невыполнения
Я не думаю, что причина потока никогда не заканчивается из-за "использования потока с буфером". Фактическая причина, аналогичная этому вопросу, заключается в том, что слияние с параметром по умолчанию eagerClose=False
ожидает завершения как source
и buffer
до его завершения (слияния). Но буфер ждет завершения слияния. Таким образом, слияние происходит в буфере, и буфер ожидает слияния.
eagerClose merge
Вы можете установить eagerClose=True
при создании слияния. Но, пользуясь нетерпением, может, к сожалению, привести к тому, что некоторые значения ItemId
не будут запрашиваться.
Косвенное решение
Если вы материализуете новый поток для каждого уровня дерева, то рекурсия может быть извлечена за пределами потока.
Вы можете построить функцию запроса, используя itemFlow
:
val itemQuery : Iterable[ItemId] => Future[Seq[Data]] =
(itemIds) => Source.apply(itemIds)
.via(itemFlow)
.runWith(Sink.seq[Data])
Теперь эту функцию запроса можно обернуть внутри рекурсивной вспомогательной функции:
val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] =
(itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData =>
val allNewKids = allNewData.flatMap(_.kids).toSet
if(allNewKids.isEmpty)
Future.successful(currentData ++ allNewData)
else
recQuery(allNewKids, currentData ++ data)
}
Количество создаваемых потоков будет равно максимальной глубине дерева.
К сожалению, поскольку Futures задействованы, эта рекурсивная функция не является хвостовой рекурсивной и может привести к "переполнению стека", если дерево слишком глубокое.
Ответ 3
Ах, радости циклов в потоках Акки. У меня была очень похожая проблема, которую я решил глубоко взламываю. Возможно, это будет полезно для вас.
Hacky Решение:
// add a graph stage that will complete successfully if it sees no element within 5 seconds
val timedStopper = b.add(
Flow[Item]
.idleTimeout(5.seconds)
.recoverWithRetries(1, {
case _: TimeoutException => Source.empty[Item]
}))
source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data
merge <~ buffer <~ kids <~ bcast
Это происходит через 5 секунд после того, как последний элемент проходит через этап timedStopper
, этот этап успешно завершает поток. Это достигается с помощью idleTimeout
, который не работает с потоком с TimeoutException
, а затем с помощью recoverWithRetries
чтобы превратить этот сбой в успешное завершение. (Я упоминал, что он был взломан).
Это, очевидно, не подходит, если у вас может быть более 5 секунд между элементами, или если вы не можете позволить себе долгое ожидание между потоком "на самом деле" и Akka, набирающим его. К счастью, ни одна из нас не беспокоилась, и в этом случае это действительно работает очень хорошо!
Небедование
К сожалению, единственное, что я могу придумать, чтобы сделать это без обмана с помощью тайм-аутов, очень и очень сложно.
В принципе, вам нужно иметь возможность отслеживать две вещи:
- существуют ли какие-либо элементы в буфере или в процессе отправки в буфер
- входной источник открыт
и завершить поток, если и только если ответ на оба вопроса - нет. Нативные строительные блоки Akka, вероятно, не смогут справиться с этим. Однако настраиваемый график может быть выполнен. Параметр может состоять в том, чтобы записать тот, который заменяет Merge
и дать ему некоторый способ узнать о содержимом буфера или, возможно, отслеживать как полученные ID, так и идентификаторы, отправляемые широковещательной передачей в буфер. Проблема заключается в том, что настраиваемые этапы графа не особенно приятны для написания в лучшие времена, не говоря уже о том, что вы смешиваете логику на разных этапах.
Предупреждения
Потоки Akka просто не работают с циклами, особенно в том, как они вычисляют завершение. В результате это может быть не единственная проблема, с которой вы сталкиваетесь.
Например, проблема, с которой мы столкнулись с очень похожей структурой, заключалась в том, что неудача в источнике рассматривалась как успешно завершающийся поток с последующим Future
. Проблема заключается в том, что по умолчанию неудачный этап не удастся остановить его, а отменит его восходящий поток (который считается успешным завершением для этих этапов). С циклом, подобным тому, который у вас есть, результатом является гонка, так как аннулирование распространяется по одной ветки, но с другой. Вам также необходимо проверить, что произойдет, если ошибки приемника; в зависимости от настроек отмены для трансляции, возможно, отмена не будет распространяться вверх, и источник с радостью продолжит потянуть элементы.
Один из последних вариантов: вообще не обращаться к рекурсивной логике с потоками. С одной стороны, если вам удастся написать один хвостовой рекурсивный метод, который вытаскивает все вложенные элементы сразу и помещает его в стадию потока, это решит ваши проблемы. С другой стороны, мы серьезно подумываем о том, чтобы отправиться в Кафку для собственной системы.