Создание потока от актера в потоках Акка
Возможно создание источников и стоков от участников с помощью методов Source.actorPublisher()
и Sink.actorSubscriber()
соответственно. Но возможно ли создать Flow
от актера?
Понятно, что это не оправдано, потому что оно реализует как признаки ActorPublisher
, так и ActorSubscriber
, но, к сожалению, объект Flow
не имеет метода для этого. В этом отличное сообщение в блоге, сделанное в более ранней версии потоков Akka, поэтому вопрос в том, возможно ли это также в последней версии (2.4.9).
Ответы
Ответ 1
Я участвую в команде Akka и хотел бы использовать этот вопрос, чтобы прояснить некоторые вещи об интерфейсах raw Reactive Streams. Надеюсь, вы найдете это полезным.
В частности, мы опубликуем несколько сообщений в блоге команды Akka о создании пользовательских этапов, включая потоки, в ближайшее время, поэтому следите за ними.
Не используйте ActorPublisher/ActorSubscriber
Пожалуйста, не используйте ActorPublisher
и ActorSubscriber
. Они слишком низки, и вы можете их реализовать таким образом, чтобы нарушить спецификацию Reactive Streams. Они - реликт прошлого, и даже тогда это был только "режим власти пользователя". В наши дни нет причин использовать эти классы. Мы никогда не предоставляли способ построить поток, потому что сложность просто взрывоопасна, если она была раскрыта как "сырой" API-интерфейс Actor для реализации и получения всех правил, которые были правильно реализованы.
Если вы действительно хотите реализовать исходные интерфейсы ReactiveStreams, пожалуйста, используйте Спецификацию TCK, чтобы убедиться, что ваша реализация верна. Скорее всего, вас убьют некоторые более сложные угловые случаи a Flow
(или в терминологии RS a Processor
).
Большинство операций можно построить без перехода на низкоуровневый
Многие потоки, которые вы должны просто построить, построив из Flow[T]
и добавив необходимые операции на него, как пример:
val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)
Это описание многократного использования потока.
Поскольку вы спрашиваете о режиме пользователя с питанием, это самый мощный оператор самой DSL: statefulFlatMapConcat
. Подавляющее большинство операций, работающих на элементах простого потока, можно выразить с помощью него: Flow.statefulMapConcat[T](f: () ⇒ (Out) ⇒ Iterable[T]): Repr[T]
.
Если вам нужны таймеры, вы можете zip
с Source.timer
и т.д.
GraphStage - это самый простой и безопасный API для создания настраиваемых этапов
Вместо этого для построения источников/потоков/стоков имеет свой собственный мощный и безопасный API: GraphStage
. Пожалуйста, прочитайте документацию о создании пользовательских GraphStages (они могут быть Sink/Source/Flow или даже любой произвольной формой). Он обрабатывает все сложные правила реактивных потоков для вас, предоставляя вам полную свободу и безопасность типов при реализации ваших этапов (который может быть потоком).
Например, взятый из документов, представляет собой реализацию GraphStage оператора filter(T => Boolean)
:
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Он также обрабатывает асинхронные каналы и по умолчанию плавкий.
В дополнение к документам, эти сообщения в блоге подробно объясняют, почему этот API является святым граалем построения пользовательских этапов любой формы:
Ответ 2
Решение Konrad демонстрирует, как создать собственный этап, использующий актеров, но в большинстве случаев я думаю, что это немного излишне.
Обычно у вас есть актер, способный отвечать на вопросы:
val actorRef : ActorRef = ???
type Input = ???
type Output = ???
val queryActor : Input => Future[Output] =
(actorRef ? _) andThen (_.mapTo[Output])
Это можно легко использовать с базовой функциональностью Flow
которая принимает максимальное количество одновременных запросов:
val actorQueryFlow : Int => Flow[Input, Output, _] =
(parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)
Теперь actorQueryFlow
можно интегрировать в любой поток...
Ответ 3
Вот решение для построения с использованием графического этапа. Актер должен подтвердить все сообщения, чтобы оказать обратное давление. Актер уведомляется, когда поток терпит неудачу/завершается, и поток терпит неудачу, когда актер завершается. Это может быть полезно, если вы не хотите использовать ask, например, когда не каждое входное сообщение имеет соответствующее выходное сообщение.
import akka.actor.{ActorRef, Status, Terminated}
import akka.stream._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
object ActorRefBackpressureFlowStage {
case object StreamInit
case object StreamAck
case object StreamCompleted
case class StreamFailed(ex: Throwable)
case class StreamElementIn[A](element: A)
case class StreamElementOut[A](element: A)
}
/**
* Sends the elements of the stream to the given 'ActorRef' that sends back back-pressure signal.
* First element is always 'StreamInit', then stream is waiting for acknowledgement message
* 'ackMessage' from the given actor which means that it is ready to process
* elements. It also requires 'ackMessage' message after each stream element
* to make backpressure work. Stream elements are wrapped inside 'StreamElementIn(elem)' messages.
*
* The target actor can emit elements at any time by sending a 'StreamElementOut(elem)' message, which will
* be emitted downstream when there is demand.
*
* If the target actor terminates the stage will fail with a WatchedActorTerminatedException.
* When the stream is completed successfully a 'StreamCompleted' message
* will be sent to the destination actor.
* When the stream is completed with failure a 'StreamFailed(ex)' message will be send to the destination actor.
*/
class ActorRefBackpressureFlowStage[In, Out](private val flowActor: ActorRef) extends GraphStage[FlowShape[In, Out]] {
import ActorRefBackpressureFlowStage._
val in: Inlet[In] = Inlet("ActorFlowIn")
val out: Outlet[Out] = Outlet("ActorFlowOut")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private lazy val self = getStageActor {
case (_, StreamAck) =>
if(firstPullReceived) {
if (!isClosed(in) && !hasBeenPulled(in)) {
pull(in)
}
} else {
pullOnFirstPullReceived = true
}
case (_, StreamElementOut(elemOut)) =>
val elem = elemOut.asInstanceOf[Out]
emit(out, elem)
case (_, Terminated(targetRef)) =>
failStage(new WatchedActorTerminatedException("ActorRefBackpressureFlowStage", targetRef))
case (actorRef, unexpected) =>
failStage(new IllegalStateException(s"Unexpected message: '$unexpected' received from actor '$actorRef'."))
}
var firstPullReceived: Boolean = false
var pullOnFirstPullReceived: Boolean = false
override def preStart(): Unit = {
//initialize stage actor and watch flow actor.
self.watch(flowActor)
tellFlowActor(StreamInit)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elementIn = grab(in)
tellFlowActor(StreamElementIn(elementIn))
}
override def onUpstreamFailure(ex: Throwable): Unit = {
tellFlowActor(StreamFailed(ex))
super.onUpstreamFailure(ex)
}
override def onUpstreamFinish(): Unit = {
tellFlowActor(StreamCompleted)
super.onUpstreamFinish()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if(!firstPullReceived) {
firstPullReceived = true
if(pullOnFirstPullReceived) {
if (!isClosed(in) && !hasBeenPulled(in)) {
pull(in)
}
}
}
}
override def onDownstreamFinish(): Unit = {
tellFlowActor(StreamCompleted)
super.onDownstreamFinish()
}
})
private def tellFlowActor(message: Any): Unit = {
flowActor.tell(message, self.ref)
}
}
override def shape: FlowShape[In, Out] = FlowShape(in, out)
}