Источник Split Akka Stream на два

У меня есть Akka Streams Source, который я хочу разделить на два источника в соответствии с предикатом.

Например, имея источник (типы умышленно упрощаются):

val source: Source[Either[Throwable, String], NotUsed] = ???

И два метода:

def handleSuccess(source: Source[String, NotUsed]): Future[Unit] = ???
def handleFailure(source: Source[Throwable, NotUsed]): Future[Unit] = ???

Я хотел бы иметь возможность разделить source соответствии с предикатом _.isRight и передать правильную часть методу handleSuccess и левой части handleFailure метод handleFailure.

Я попытался использовать Broadcast сплиттер, но в конце концов он требует Sink.

Ответы

Ответ 1

Хотя вы можете выбрать, с какой стороны Source вы хотите получать элементы, из него невозможно создать Source, который дает два выхода, которые, как кажется, вам в конечном итоге потребуются.

Учитывая значение GraphStage ниже, которое существенно разделяет левое и правое значения на два выхода...

/**
  * Fans out left and right values of an either
  * @tparam L left value type
  * @tparam R right value type
  */
class EitherFanOut[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
  import akka.stream.{Attributes, Outlet}
  import akka.stream.stage.GraphStageLogic

  override val shape: FanOutShape2[Either[L, R], L, R] = new FanOutShape2[Either[L, R], L, R]("EitherFanOut")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    var out0demand = false
    var out1demand = false

    setHandler(shape.in, new InHandler {
      override def onPush(): Unit = {

        if (out0demand && out1demand) {
          grab(shape.in) match {
            case Left(l) =>
              out0demand = false
              push(shape.out0, l)
            case Right(r) =>
              out1demand = false
              push(shape.out1, r)
          }
        }
      }
    })

    setHandler(shape.out0, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out0demand) {
          out0demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })

    setHandler(shape.out1, new OutHandler {
      @scala.throws[Exception](classOf[Exception])
      override def onPull(): Unit = {
        if (!out1demand) {
          out1demand = true
        }

        if (out0demand && out1demand) {
          pull(shape.in)
        }
      }
    })
  }
}

.. вы можете направить их, чтобы получать только одну сторону:

val sourceRight: Source[String, NotUsed] = Source.fromGraph(GraphDSL.create(source) { implicit b => s =>
  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> Sink.ignore

  SourceShape(eitherFanOut.out1)
})

Await.result(sourceRight.runWith(Sink.foreach(println)), Duration.Inf)

... или, возможно, более желательно, направьте их на два отдельных Sink:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

val flow = RunnableGraph.fromGraph(GraphDSL.create(source, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

  import GraphDSL.Implicits._

  val eitherFanOut = b.add(new EitherFanOut[Throwable, String])

  s ~> eitherFanOut.in
  eitherFanOut.out0 ~> l.in
  eitherFanOut.out1 ~> r.in

  ClosedShape
})


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

(Импорт и первоначальная настройка)

import akka.NotUsed
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, InHandler, OutHandler}
import akka.stream._
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val classLoader = getClass.getClassLoader
implicit val system = ActorSystem("QuickStart", ConfigFactory.load(classLoader), classLoader)
implicit val materializer = ActorMaterializer()

val values: List[Either[Throwable, String]] = List(
  Right("B"),
  Left(new Throwable),
  Left(new RuntimeException),
  Right("B"),
  Right("C"),
  Right("G"),
  Right("I"),
  Right("F"),
  Right("T"),
  Right("A")
)

val source: Source[Either[Throwable, String], NotUsed] = Source.fromIterator(() => values.toIterator)

Ответ 2

Это реализовано в akka-stream-contrib как PartitionWith. Добавьте эту зависимость в SBT, чтобы втянуть ее в свой проект:

//latest version available on https://github.com/akka/akka-stream-contrib/releases libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"

PartitionWith имеет форму Broadcast(2), но с потенциально разными типами для каждого из двух выходов. Вы предоставляете ему предикат для применения к каждому элементу, и в зависимости от результата они направляются в соответствующую розетку. Затем вы можете присоединить Sink или Flow к каждому из этих розеток независимо друг от друга. Основываясь на примере cessationofime, с Broadcast заменой на PartitionWith:

val eitherSource: Source[Either[Throwable, String], NotUsed] = Source.empty
val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))

val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)
                                  ((_, _, _)) { implicit b => (s, l, r) =>

  import GraphDSL.Implicits._

  val pw = b.add(
    PartitionWith.apply[Either[Throwable, String], Throwable, String](identity)
  )

  eitherSource ~> pw.in
  pw.out0 ~> leftSink
  pw.out1 ~> rightSink

  ClosedShape
})

val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

Ответ 3

Для этого вы можете использовать трансляцию, затем фильтровать и отображать потоки в GraphDSL:

val leftSink = Sink.foreach[Throwable](s => println(s"FAILURE: $s"))
val rightSink = Sink.foreach[String](s => println(s"SUCCESS: $s"))


val flow = RunnableGraph.fromGraph(GraphDSL.create(eitherSource, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>

       import GraphDSL.Implicits._

       val broadcast = b.add(Broadcast[Either[Throwable,String]](2))


       s ~> broadcast.in
       broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> l.in
       broadcast.out(1).filter(_.isRight).map(_.right.get) ~> r.in


       ClosedShape
  })


val r = flow.run()
Await.result(Future.sequence(List(r._2, r._3)), Duration.Inf)

Я ожидаю, что вы сможете выполнять функции, которые вы хотите, на карте.

Ответ 4

Тем временем это было введено в стандартную Akka-Streams: https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Partition.html.

Вы можете разделить входной поток с помощью предиката, а затем использовать collect на каждом выходе, чтобы получить только интересующие вас типы.