Ответ 1
Я нашел проблему. Это была проблема с DNS в конце. Я связывался с брокерами Kafka по IP-адресам, но брокер отвечает DNS-именем. После настройки DNS-имен на стороне потребителя он снова начал работать.
Я пытаюсь проверить потребителя kafka, используя данные из удаленного кластера Kafka. Я получаю следующую ошибку при использовании kafka-console-consumer.sh
:
ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
java.lang.IllegalStateException: No entry found for connection 2147475658
at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:330)
at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:134)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:885)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:276)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:655)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:635)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:436)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:76)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
Вот команда, которую я использую:
./bin/kafka-console-consumer.sh --bootstrap-server SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT} --consumer.config ./config/consumer.properties --topic MYTOPIC --group MYGROUP
Вот файл ./config/consumer.properties
:
bootstrap.servers=SSL://{IP}:{PORT},SSL://{IP}:{PORT},SSL://{IP}:{PORT}
# consumer group id
group.id=MYGROUP
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
auto.offset.reset=earliest
#### Security
security.protocol=SSL
ssl.key.password=test1234
ssl.keystore.location=/opt/kafka/config/certs/keystore.jks
ssl.keystore.password=test1234
ssl.truststore.location=/opt/kafka/config/certs/truststore.jks
ssl.truststore.password=test1234
Ты хоть представляешь, в чем проблема?
Я нашел проблему. Это была проблема с DNS в конце. Я связывался с брокерами Kafka по IP-адресам, но брокер отвечает DNS-именем. После настройки DNS-имен на стороне потребителя он снова начал работать.
У меня была эта проблема (с потребителями и производителями) при запуске Kafka и Zookeeper в качестве контейнеров Docker.
Решение состояло в том, чтобы установить advertised.listeners
в файле config/server.properties
брокеров Kafka, чтобы он содержал IP-адрес контейнера, например
advertised.listeners=PLAINTEXT://172.15.0.8:9092
См. Https://github.com/maxant/kafkaplayground/blob/master/start-kafka.sh для примера сценария, используемого для запуска Kafka внутри контейнера после правильной настройки файла свойств.
Кажется, что свойство слушателя кластера Kafka не настроено в server.properties.
В удаленном кластере kafka это свойство должно быть раскомментировано с правильным именем хоста.
listeners=PLAINTEXT://0.0.0.0:9092
Вы уверены, что удаленная кафка запущена? Я бы предложил запустить nmap -p PORT HOST
, чтобы убедиться, что порт открыт (если он не настроен иначе, порт должен быть 9092). Если это нормально, тогда вы можете использовать kafkacat, который делает вещи проще. Создание потребителя, запускающего kafkacat -b HOST:PORT -t YOUR_TOPIC -C -o beginning
или создание производителя, запускающего kafkacat -b HOST:PORT
-t YOUR_TOPIC -p
Можете ли вы указать, как это исправить. У меня та же проблема
Вы можете увидеть сообщение, что запись не найдена, если версия RestClient отличается от вашей версии ElasticSearch
В моем случае я получал, что при попытке подключиться к контейнеру Kafka мне пришлось передать следующее:
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
Надеюсь, это поможет кому-то
У меня та же проблема, я тоже сказал @yeralin, но я получаю это сообщение. Мой docker-compose.yml такой:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.2.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:5.2.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT://localhost:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:5.2.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://localhost:9092,PLAINTEXT://broker:29092
connect:
image: confluentinc/kafka-connect-datagen:latest
build:
context: .
dockerfile: Dockerfile
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.1.1 Connect image
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run$
control-center:
image: confluentinc/cp-enterprise-control-center:5.2.1
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema-registry
- connect
- ksql-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_URL: "http://ksql-server:8088"
CONTROL_CENTER_KSQL_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksql-server:
image: confluentinc/cp-ksql-server:5.2.1
hostname: ksql-server
container_name: ksql-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksql-server
KSQL_APPLICATION_ID: "cp-all-in-one"
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
ksql-cli:
image: confluentinc/cp-ksql-cli:5.2.1
container_name: ksql-cli
depends_on:
- broker
- connect
- ksql-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
# Downrev ksql-examples to 5.1.2 due to DEVX-798 (work around issues in 5.2.0)
image: confluentinc/ksql-examples:5.1.2
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksql-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:5.2.1
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
Я следил за страницей @rmoff, но ничего. Я пытаюсь прослушивать события моей базы данных MySQL на AWS EC2, также у меня есть экземпляр с Ubuntu, где я запускаю Kafka в Docker Container.
При этом состояние моего разъема уже работает и его задача тоже. Но я пока не вижу данных из своей таблицы foobar.
В моем случае не удалось найти идентификатор брокера (2147475658), упомянутый по ошибке.
No entry found for connection 2147475658
Вы можете создать брокера с идентификатором 2147475658, установив свойство broker.id в файле server.properties. Создайте отдельные файлы server.properties для всех брокеров.
Или, если у вас есть хотя бы один живой брокер, вы можете удалить/удалить брокера, который выдает ошибку.
Ссылка на документацию: https://kafka.apache.org/documentation/#quickstart_multibroker