Ответ 1
Один из способов иметь неограниченный источник - использовать в качестве источника особый вид актера, который смешивается с признаком ActorPublisher
. Если вы создаете один из этих видов участников, а затем завершите вызовом ActorPublisher.apply
, вы получите экземпляр "Реактивные потоки" Publisher
, и с ним вы можете использовать apply
из Source
, чтобы создать Source
от него. После этого вам просто нужно убедиться, что ваш класс ActorPublisher
правильно обрабатывает протокол Reactive Streams для отправки элементов вниз по течению, и вам хорошо идти. Очень тривиальный пример:
import akka.actor._
import akka.stream.actor._
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl._
object DynamicSourceExample extends App{
implicit val system = ActorSystem("test")
implicit val materializer = ActorFlowMaterializer()
val actorRef = system.actorOf(Props[ActorBasedSource])
val pub = ActorPublisher[Int](actorRef)
Source(pub).
map(_ * 2).
runWith(Sink.foreach(println))
for(i <- 1 until 20){
actorRef ! i.toString
Thread.sleep(1000)
}
}
class ActorBasedSource extends Actor with ActorPublisher[Int]{
import ActorPublisherMessage._
var items:List[Int] = List.empty
def receive = {
case s:String =>
if (totalDemand == 0)
items = items :+ s.toInt
else
onNext(s.toInt)
case Request(demand) =>
if (demand > items.size){
items foreach (onNext)
items = List.empty
}
else{
val (send, keep) = items.splitAt(demand.toInt)
items = keep
send foreach (onNext)
}
case other =>
println(s"got other $other")
}
}