Как использовать потребительский API Kafka 0.8.2?
Я начинаю с последнего документа Kafka http://kafka.apache.org/documentation.html. Но я сталкиваюсь с некоторыми проблемами, когда я пытаюсь использовать новый API-интерфейс Consumer. Я выполнил следующие действия:
1. Добавить новую зависимость
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
</dependency>
2. Добавить конфигурации
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"host:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
3. Использовать API KafkaConsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");
Однако, когда я пытаюсь опросить сообщение от брокера, я получил только нуль:
Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
process(records);
else
System.err.println("null");
И затем я знаю, что неправильно с потребителем после того, как я проверил исходный код:
@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
// TODO Auto-generated method stub
return null;
}
Что еще хуже, я не могу найти никакой другой полезной информации об API 0.8.2, так как все использование Kafka несовместимо с последней версией. Может ли кто-нибудь помочь мне? Большое спасибо.
Ответы
Ответ 1
Я также пытаюсь написать "Потребитель" поверх Kafka 0.8.2.1, чтобы читать сообщения, созданные новым продюсером.
До сих пор у меня есть то, что API-интерфейс Producer готов и полезен, в то время как на стороне потребителя мы должны ждать 0.8.3, как отметил @habsq, и вы уже узнали, что для потребителя есть некоторый код, но он все еще не работает.
Таким образом, клиент для использования (текущий API-интерфейс клиента) - это тот, который найден в "основном" проекте вашей текущей версии Kafka, т.е. 0.8.2.1 (лучше не отказывать клиенту в какой-либо другой версии).
Итак, теперь нам нужно импортировать две банки: одну для "новых" java-клиентов и одну для основного проекта, в зависимости также от используемой версии scala (я использую 2.11).
В моем случае я использую graddle для управления зависимостями, поэтому мне просто нужно импортировать
dependencies {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}
При обновлении зависимостей он получит все необходимые библиотеки.
Если вы используете другую версию scala, просто измените версию; в любом случае вы можете найти всю другую версию или полный pom на центральной станции maven:
http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22
Если вы используете эту реализацию Consumer, все текущие примеры должны работать как обычно.
PS ref: Kafka-users ml thread http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2
Ответ 2
Да. Я могу подтвердить, что выпуск 0.8.2.1 имел проблемы с потреблением сообщений. Теперь, делая простой потребитель с Java/ Groovy и выпуском 0.10.1.0, все работает отлично.
Нет необходимости устанавливать PARTITION_ASSIGNMENT_STRATEGY
.