Ответ 1
Если вы используете Kafka Streams, вам необходимо применить функции/операторы к вашим потокам данных. В вашем случае вы создаете объект KStream
, таким образом, вы хотите применить оператор к source
.
В зависимости от того, что вы хотите сделать, есть операторы, которые применяют функцию к каждой записи в потоке независимо (например, map()
), или другие операторы, которые применяют функцию к нескольким записям вместе (например, aggregateByKey()
). Вы должны заглянуть в документацию: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl и примеры https://github.com/confluentinc/kafka- потоки-примеры
Таким образом, вы никогда не создаете локальные переменные, используя потоки Kafka, как показано в примере выше, а скорее встраиваете все в операторы/функции, которые объединяются в цепочки.
Например, если вы хотите распечатать все входные записи на стандартный вывод, вы можете сделать
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
Таким образом, после того, как вы запустите свое приложение через streams.start()
, оно будет использовать записи из вашей входной темы, и для каждой записи вашей темы будет выполнен вызов apply(...)
, который печатает запись в stdout.
Конечно, более естественным способом печати потока на консоль было бы использование source.print()
(который внутренне в основном совпадает с показанным оператором foreach()
с уже заданным ForeachAction
.)
Для вашего примера с назначением строки локальной переменной вам нужно будет поместить свой код в apply(...)
и выполнить там регулярные выражения и т.д., Чтобы "извлечь 3-буквенные ключевые слова".
Лучший способ выразить это, однако, был бы через комбинацию flatMapValues()
и print()
(то есть source.flatMapValues(...).print()
). flatMapValues()
вызывается для каждой входной записи (в вашем случае я предполагаю, что ключ будет null
поэтому вы можете его игнорировать). В вашей функции flatMapValue
вы применяете регулярное выражение и для каждого совпадения добавляете совпадение в список значений, которые вы в конечном итоге возвращаете.
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> keywords = new ArrayList<String>();
// apply regex to value and for each match add it to keywords
return keywords;
}
}
Выходом flatMapValues
снова будет KStream
, содержащий запись для каждого найденного ключевого слова (т. KStream
Выходной поток представляет собой "объединение" по всем спискам, которые вы возвращаете в ValueMapper#apply()
). Наконец, вы просто выводите свой результат на консоль с помощью print()
. (Конечно, вы могли бы также использовать один foreach
вместо flatMapValue
+ print
но это было бы менее модульно.)