Как использовать потребительский API Kafka 0.8.2?

Я начинаю с последнего документа Kafka http://kafka.apache.org/documentation.html. Но я сталкиваюсь с некоторыми проблемами, когда я пытаюсь использовать новый API-интерфейс Consumer. Я выполнил следующие действия:

1. Добавить новую зависимость

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>

2. Добавить конфигурации

    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

3. Использовать API KafkaConsumer

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");

Однако, когда я пытаюсь опросить сообщение от брокера, я получил только нуль:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
    process(records);
else
    System.err.println("null");

И затем я знаю, что неправильно с потребителем после того, как я проверил исходный код:

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
    // TODO Auto-generated method stub
    return null;
}

Что еще хуже, я не могу найти никакой другой полезной информации об API 0.8.2, так как все использование Kafka несовместимо с последней версией. Может ли кто-нибудь помочь мне? Большое спасибо.

Ответы

Ответ 1

Я также пытаюсь написать "Потребитель" поверх Kafka 0.8.2.1, чтобы читать сообщения, созданные новым продюсером.

До сих пор у меня есть то, что API-интерфейс Producer готов и полезен, в то время как на стороне потребителя мы должны ждать 0.8.3, как отметил @habsq, и вы уже узнали, что для потребителя есть некоторый код, но он все еще не работает.

Таким образом, клиент для использования (текущий API-интерфейс клиента) - это тот, который найден в "основном" проекте вашей текущей версии Kafka, т.е. 0.8.2.1 (лучше не отказывать клиенту в какой-либо другой версии).

Итак, теперь нам нужно импортировать две банки: одну для "новых" java-клиентов и одну для основного проекта, в зависимости также от используемой версии scala (я использую 2.11).

В моем случае я использую graddle для управления зависимостями, поэтому мне просто нужно импортировать

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
  compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}

При обновлении зависимостей он получит все необходимые библиотеки.

Если вы используете другую версию scala, просто измените версию; в любом случае вы можете найти всю другую версию или полный pom на центральной станции maven: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

Если вы используете эту реализацию Consumer, все текущие примеры должны работать как обычно.

PS ref: Kafka-users ml thread http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

Ответ 2

Да. Я могу подтвердить, что выпуск 0.8.2.1 имел проблемы с потреблением сообщений. Теперь, делая простой потребитель с Java/ Groovy и выпуском 0.10.1.0, все работает отлично.

Нет необходимости устанавливать PARTITION_ASSIGNMENT_STRATEGY.