Ответ 1
Обновление:
В Kafka 2.5 будет добавлен новый метод KStream#toTable()
, который обеспечит удобный способ преобразования KStream
в KTable
. Подробнее см.: https://cwiki.apache.org/confluence/display/KAFKA/KIP-523%3A+Add+KStream%23toTable+to+the+Streams+DSL
Оригинальный ответ:
На данный момент нет прямого пути сделать это. Ваш подход абсолютно действителен, как обсуждалось в Confluent FAQs: http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step
Это самый простой подход в отношении кода. Однако у него есть недостатки: (а) вам нужно управлять дополнительной темой и (б) это приводит к дополнительному сетевому трафику, потому что данные записываются и перечитываются с Kafka.
Есть одна альтернатива, использующая "фиктивное сокращение":
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> stream = ...; // some computation that creates the derived KStream
KTable<String, Long> table = stream.groupByKey().reduce(
new Reducer<Long>() {
@Override
public Long apply(Long aggValue, Long newValue) {
return newValue;
}
},
"dummy-aggregation-store");
Этот подход несколько более сложен в отношении кода по сравнению с вариантом 1, но имеет то преимущество, что (а) не требуется ручное управление темами и (б) повторное чтение данных из Kafka не требуется.
В целом, вам нужно решить, какой подход вам больше нравится:
В варианте 2 Kafka Streams создаст внутреннюю тему журнала изменений для резервного копирования KTable для обеспечения отказоустойчивости. Таким образом, оба подхода требуют некоторого дополнительного хранения в Kafka и приводят к дополнительному сетевому трафику. В целом, это компромисс между немного более сложным кодом в варианте 2 и ручным управлением темой в варианте 1.