Доступ к базовому ActorRef потока akka Источник, созданный Source.actorRef
Я пытаюсь использовать метод Source.actorRef для создания объекта akka.stream.scaladsl.Source. Что-то вроде формы
import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source
case class Weather(zip : String, temp : Double, raining : Boolean)
val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)
val sunnySource = weatherSource.filter(!_.raining)
...
Мой вопрос: как отправить данные на исходный объект на основе ActorRef?
Я предположил, что отправка сообщений в Источник была чем-то вроде
//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)
Но weatherSource
не имеет оператора !
или tell
.
Документация не слишком описательна в отношении использования Source.actorRef, она просто говорит, что вы можете...
Заранее благодарю за ваш отзыв и ответ.
Ответы
Ответ 1
Вам понадобится Flow
:
import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{Sink, Flow}
case class Weather(zip : String, temp : Double, raining : Boolean)
val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)
val sunnySource = weatherSource.filter(!_.raining)
val ref = Flow[Weather]
.to(Sink.ignore)
.runWith(sunnySource)
ref ! Weather("02139", 32.0, true)
Помните, что все это экспериментально и может измениться!
Ответ 2
Как @Noah указывает на экспериментальную природу akka-потоков, его ответ может не работать с выпуском 1.0. Я должен был следовать примеру в этом примере:
implicit val materializer = ActorMaterializer()
val (actorRef: ActorRef, publisher: Publisher[TweetInfo]) = Source.actorRef[TweetInfo](1000, OverflowStrategy.fail).toMat(Sink.publisher)(Keep.both).run()
actorRef ! TweetInfo(...)
val source: Source[TweetInfo, Unit] = Source[TweetInfo](publisher)
Ответ 3
Экземпляр ActorRef
, как и все "материализованные значения", станет доступен только после того, как будет реализован весь поток, или, другими словами, при запуске RunnableGraph.
// RunnableGraph[ActorRef] means that you get ActorRef when you run the graph
val rg1: RunnableGraph[ActorRef] = sunnySource.to(Sink.foreach(println))
// You get ActorRef instance as a materialized value
val actorRef1: ActorRef = rg1.run()
// Or even more correct way: to materialize both ActorRef and future to completion
// of the stream, so that we know when we are done:
// RunnableGraph[(ActorRef, Future[Done])] means that you get tuple
// (ActorRef, Future[Done]) when you run the graph
val rg2: RunnableGraph[(ActorRef, Future[Done])] =
sunnySource.toMat(Sink.foreach(println))(Keep.both)
// You get both ActorRef and Future[Done] instances as materialized values
val (actorRef2, future) = rg2.run()
actorRef2 ! Weather("90210", 72.0, false)
actorRef2 ! Weather("02139", 32.0, true)
actorRef2 ! akka.actor.Status.Success("Done!") // Complete the stream
future onComplete { /* ... */ }