Кафка на Кубернете мульти- node

Итак, моя цель - создать кластер из нескольких кафк-брокеров распределенным способом. Но я не вижу способа заставить брокеров знать друг о друге.

Насколько я понимаю, каждый брокер нуждается в отдельном идентификаторе в своей конфигурации, который я не могу гарантировать или настроить, если я запускаю контейнеры из кубернетов?

Они также должны иметь один и тот же рекламируемый_host?

Есть ли какие-либо параметры, которые мне не хватает, которые нужно изменить, чтобы узлы могли обнаружить друг друга?

Было бы целесообразно выполнить такую ​​конфигурацию в конце файла Docker с помощью script? И/или общий том?

В настоящее время я пытаюсь сделать это с помощью spotify/kafka-image, который имеет предварительно сконфигурированную комбинацию zookeeper + kafka, на ванильной кубернете.

Ответы

Ответ 1

Мое решение для этого было использовать IP как идентификатор: обрезать точки и получить уникальный идентификатор, который также доступен вне контейнера для других контейнеров.

С помощью службы вы можете получить доступ к IP-адресам нескольких контейнеров (см. мой ответ здесь, как это сделать: Каким образом можно позволить каналам kubenetes общаться друг с другом?

чтобы вы могли получить их идентификаторы, если вы используете IP-адреса как уникальный идентификатор. Единственная проблема заключается в том, что идентификаторы не являются непрерывными или начинаются с 0, но zookeeper/kafka, похоже, не возражают.

РЕДАКТИРОВАТЬ 1:

Следствие касается настройки Zookeeper:

Каждый ZK node должен знать другие узлы. Служба обнаружения Kubernetes известна узлами, которые находятся в службе, поэтому идея состоит в том, чтобы запустить Сервис с узлами ZK.

Эта служба должна быть запущена ПЕРЕД созданием ReplicationController (RC) для модулей Zookeeper.

При запуске script контейнера ZK необходимо:

  • дождитесь, пока служба обнаружения заполнит ZK Service своими узлами (это занимает несколько секунд, теперь я просто добавляю спящий режим 10 в начале моего запуска script, но более надежно вы должны искать службу для имеют по крайней мере 3 узла в нем.)
  • найдите контейнеры, формирующие Сервис в службе обнаружения: это делается путем запроса API. переменная среды KUBERNETES_SERVICE_HOST доступна в каждом контейнере. Конечная точка для поиска описания сервиса затем

URL="http(s)://$USERNAME:[email protected]${KUBERNETES_SERVICE_HOST/api/v1/namespaces/${NAMESPACE}/endpoints/${SERVICE_NAME}"

где NAMESPACE есть default, если вы не изменили его, а SERVICE_NAME будет zookeeper, если вы назвали свой сервисный zookeeper.

там вы получите описание контейнеров, образующих Сервис, с их ip в поле "ip". Вы можете сделать:

curl -s $URL | grep '\"ip\"' | awk '{print $2}' | awk -F\" '{print $2}' 

чтобы получить список IP-адресов в Сервисе. С этим запомните zoo.cfg на node, используя указанный выше идентификатор

Возможно, вам понадобится USERNAME и PASSWORD, чтобы добраться до конечной точки по таким сервисам, как движок контейнера google. Они должны быть помещены в Секретный том (см. Doc здесь: http://kubernetes.io/v1.0/docs/user-guide/secrets.html)

Вам также понадобится использовать curl -s --insecure в Google Container Engine, если вы не столкнетесь с проблемой добавления сертификата CA в свои контейнеры

В основном добавьте том в контейнер и найдите значения из файла. (вопреки тому, что говорит doc, НЕ помещайте \n в конце имени пользователя или пароля при кодировке base64: это просто усложняет вашу жизнь при чтении)

ИЗМЕНИТЬ 2:

Еще одна вещь, которую вам нужно сделать на узлах Kafka, - это получить IP и имена хостов и поместить их в файл /etc/hosts. Кажется, что Kafka знает узлы по именам хостов, и они по умолчанию не установлены в узлах обслуживания.

ИЗМЕНИТЬ 3:

После долгих проб и мыслей, использующих IP как идентификатор, может быть не так много: это зависит от того, как вы настраиваете хранилище. для любого вида распределенного сервиса, такого как zookeeper, kafka, mongo, hdfs, вы можете использовать тип хранилища emptyDir, так что именно на этом node (монтирование удаленных типов хранения поражает цель распространения этих сервисов! ) emptyDir будет relaod с данными на том же node, поэтому представляется логичным использовать идентификатор node ID (node IP) в качестве идентификатора, потому что тогда модуль, который перезагружается на том же node, будет иметь данные. Это предотвратит потенциальное повреждение данных (если новый node начинает писать в том же каталоге, который на самом деле не пуст, кто знает, что может произойти), а также с Kafka, тем, которым назначается broker.id, если идентификатор брокера изменения, zookeeper не обновляет тему broker.id, и тема выглядит как она доступна. НО указывает на неправильный broker.id и это беспорядок.

До сих пор я еще не нашел, как получить IP-адрес node, но я думаю, что это возможно для поиска в API, просматривая имена сервисных имен, а затем node, на которых они развернуты.

EDIT 4

Чтобы получить IP-адрес node, вы можете получить имя хоста pod hostname из API конечных точек /API/v 1/пространство имен/по умолчанию/оконечных/ как объяснялось выше. то вы можете получить IP-адрес node из имени /API/v 1/пространство имен/по умолчанию/стручки/

PS: это вдохновляет пример в репозитории Kubernetes (пример для rethinkdb здесь: https://github.com/kubernetes/kubernetes/tree/master/examples/rethinkdb

Ответ 2

Посмотрите https://github.com/CloudTrackInc/kubernetes-kafka Это позволяет запускать Kafka в кубернетах и ​​поддерживать масштабирование и автоматическое продление.

Ответ 3

Я сделал это с помощью docker-compose (разница для Kubernetes заключалась бы в том, что вы передавали бы ID через ваш service.yaml и имели бы 2 службы):

kafka1:
  build: kafka-0.8.1/
  ports:
  - 9092
  links:
  - zookeeper
  environment:
  - ID=1
kafka2:
  build: kafka-0.8.1/
  ports:
  - 9092
  links:
  - zookeeper
  environment:
  - ID=2

Config:

broker.id=${ID}
port=9092
advertised.host.name=${HOST}
advertised.port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-${ID}
num.partitions=200
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=${DOCKER_ZOOKEEPER_1_PORT_2181_TCP_ADDR}:${DOCKER_ZOOKEEPER_1_PORT_2181_TCP_PORT}
zookeeper.connection.timeout.ms=6000

ш:

#!/bin/bash
echo "Running config"
export HOST=`grep $HOSTNAME /etc/hosts | awk '{print $1}'`
export ID=${ID:?}
perl -p -i -e 's/\$\{([^}]+)\}/defined $ENV{$1} ? $ENV{$1} : $&/eg' < /broker.template > $KAFKA_HOME/config/server.properties
echo "Done"
echo "starting kafka with:"
echo "$KAFKA_HOME/config/server.properties"
echo ""
cat $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

Ответ 4

Это заметно проявляется в моих поисках, но содержит довольно устаревшую информацию. Чтобы обновить это с помощью более современного решения, вы должны использовать StatefulSet развертывание, которое будет генерировать модули, которые имеют целочисленный счетчик вместо хэш на свое имя, например. Кафка-контроллер-0.

Это, конечно, имя хоста, поэтому оттуда просто извлечь искомый идентификатор брокер-инварианта с помощью awk:

hostname | awk -F'-' '{print $3}'

Самые популярные контейнеры, доступные для Kafka в наши дни, имеют команду идентификатора брокера.