Ответ 1
Если условие останова "снаружи потока"
Существует расширенный строительный блок под названием KillSwitch
, который вы можете использовать для этого: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html Поток будет закрыт после уведомления об ошибке.
Он имеет такие методы, как abort(reason)
/shutdown
и т.д., см. здесь API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html
Справочная документация находится здесь: http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala
Пример использования:
val countingSrc = Source(Stream.from(1)).delay(1.second,
DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]
val (killSwitch, last) = countingSrc
.viaMat(KillSwitches.single)(Keep.right)
.toMat(lastSnk)(Keep.both)
.run()
doSomethingElse()
killSwitch.shutdown()
Await.result(last, 1.second) shouldBe 2
Если условие останова находится внутри потока
Вы можете использовать takeWhile
, чтобы выразить какое-либо условие действительно, хотя иногда take
или limit
также может быть достаточно "принять 10 lnes".
Если ваша логика очень продвинута, вы можете создать специальный этап, который обрабатывает эту специальную логику с помощью statefulMapConcat
, которая позволяет выразить буквально все, чтобы вы могли завершить поток, когда захотите "изнутри".