Как разбить входящий поток на символ разделителя, используя потоки Akka
Я немного экспериментировал с экспериментальным API Akka Streams, и у меня есть прецедент, который я хотел бы увидеть, как реализовать. Для моего варианта использования у меня есть StreamTcp
Flow
, который подается от привязки входного потока подключений к моему серверному сокету. Поток, который у меня есть, основан на данных ByteString
, входящих в него. Данные, которые поступают, будут иметь разделитель в нем, что означает, что я должен рассматривать все до разделителя как одно сообщение и все после следующего и следующего следующего разделителя в качестве следующего сообщения. Поэтому, играя с более простым примером, не используя сокеты и только статический текст, это то, что я придумал:
import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
object BasicTransformation {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
Flow(data).
splitWhen(c => c == '.').
foreach{producer =>
Flow(producer).
filter(c => c != '.').
fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
map(_.toString).
filter(!_.isEmpty).
foreach(println(_)).
consume(FlowMaterializer(MaterializerSettings()))
}.
onComplete(FlowMaterializer(MaterializerSettings())) {
case any =>
system.shutdown
}
}
}
Основная функция на Flow
, которую я нашел для достижения моей цели, была splitWhen
, которая затем создает дополнительные подпотоки, по одному для каждого сообщения на разделитель .
. Затем я обрабатываю каждый подпоток с другим конвейером шагов, наконец печатая отдельные сообщения в конце.
Все это кажется немного многословным, чтобы выполнить то, что я считал довольно простым и распространенным вариантом использования. Поэтому мой вопрос: есть ли более чистый и менее верный способ сделать это или это правильный и предпочтительный способ разделить поток на разделитель?
Ответы
Ответ 1
После публикации этого же вопроса в группе пользователей Akka я получил некоторые предложения от Эндра Варги и Виктора Клана (https://groups.google.com/forum/#!topic/akka-user/YsnwIAjQ3EE). Я закончил с предложением Endre Transformer
, а затем использовал метод transform
на Flow
. Несколько измененная версия моего предыдущего примера приведена ниже:
import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
import akka.stream.Transformer
import akka.util.ByteStringBuilder
object BasicTransformation {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
implicit val mater = FlowMaterializer(MaterializerSettings())
val data = List(
ByteString("Lorem Ipsum is"),
ByteString(" simply.Dummy text of.The prin"),
ByteString("ting.And typesetting industry.")
)
Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_))
}
}
С определением PeriodDelimitedTransformer
будет следующее:
class PeriodDelimitedTransformer extends Transformer[ByteString,String]{
val buffer = new ByteStringBuilder
def onNext(msg:ByteString) = {
val msgString = msg.utf8String
val delimIndex = msgString.indexOf('.')
if (delimIndex == -1){
buffer.append(msg)
List.empty
}
else{
val parts = msgString.split("\\.")
val endsWithDelim = msgString.endsWith(".")
buffer.putBytes(parts.head.getBytes())
val currentPiece = buffer.result.utf8String
val otherPieces = parts.tail.dropRight(1).toList
buffer.clear
val lastPart =
if (endsWithDelim){
List(parts.last)
}
else{
buffer.putBytes(parts.last.getBytes())
List.empty
}
val result = currentPiece :: otherPieces ::: lastPart
result
}
}
}
Итак, некоторые сложности моего предыдущего решения свернуты в этот Transformer
, но это похоже на лучший подход. В моем первоначальном решении поток заканчивается тем, что он разбивается на несколько подпотоков, и это не совсем то, что я хотел.
Ответ 2
Похоже, что API был недавно улучшен, включив akka.stream.scaladsl.Framing. Документация также содержит пример о том, как ее использовать. Что касается вашего конкретного вопроса:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Source}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
object TcpDelimiterBasedMessaging extends App {
object chunks {
val first = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
val second = ByteString("More text.delimited by.a period.")
}
implicit val system = ActorSystem("delimiter-based-messaging", ConfigFactory.defaultReference())
implicit val dispatcher = system.dispatcher
implicit val materializer = ActorMaterializer()
Source(chunks.first :: chunks.second :: Nil)
.via(Framing.delimiter(ByteString("."), Int.MaxValue))
.map(_.utf8String)
.runForeach(println)
.onComplete(_ => system.terminate())
}
Производит следующий вывод:
Lorem Ipsum is simply
Dummy text of the printing
And typesetting industry
More text
delimited by
a period
Ответ 3
Существует пример кода, который делает что-то подобное, опубликованное сейчас в Поток-книге Streams в документации потоков akka в Разбор строк из потока ByteStrings.
Ответ 4
Я думаю, что использование Framing
у Андрея - лучшее решение вашего вопроса, но у меня была аналогичная проблема, и я нашел Framing
слишком ограниченным. Вместо этого я использовал statefulMapConcat
, что позволяет группировать ваш вход ByteString с помощью любых правил, которые вам нравятся. Вот код для вашего вопроса, если он кому-то поможет:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
object BasicTransformation extends App {
implicit val system = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
val grouping = Flow[Byte].statefulMapConcat { () =>
var bytes = ByteString()
byt =>
if (byt == '.') {
val string = bytes.utf8String
bytes = ByteString()
List(string)
} else {
bytes :+= byt
Nil
}
}
Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate())
}
Что производит:
Lorem Ipsum is simply
Dummy text of the printing
And typesetting industry