Как написать файл в Kafka Producer

Я пытаюсь загрузить простой текстовый файл вместо стандартного ввода в Kafka. После загрузки Kafka я выполнил следующие шаги:

Запущен zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

Начальный сервер

bin/kafka-server-start.sh config/server.properties

Создал тему с именем "test":

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Ran the Producer:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
Test1
Test2

Прослушивание потребителя:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2

Вместо стандартного ввода я хочу передать файл данных или даже простой текстовый файл для Продюсера, который можно увидеть непосредственно у потребителя. Любая помощь будет действительно оценена. Спасибо!

Ответы

Ответ 1

Вы можете передать его в:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt

Найден здесь.

От 0.9.0:

kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

Ответ 2

$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt

работал у меня в Kafka-0.9.0

Ответ 3

Вот несколько способов, которые немного более обобщены, но могут быть излишними для простого файла

хвост

tail -n0 -F my_file.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

Объяснение

  • tail читается с конца файла по мере его роста или постоянно добавляется журнал.
  • -n0 указывает выходные строки 0, поэтому выбирается только новая строка.
  • -F следует за файлом по имени вместо дескриптора, поэтому он работает, даже если он повернут

Syslog-нг

options {                                                                                                                             
    flush_lines (0);                                                                                                                
    time_reopen (10);                                                                                                               
    log_fifo_size (1000);                                                                                                          
    long_hostnames (off);                                                                                                           
    use_dns (no);                                                                                                                   
    use_fqdn (no);                                                                                                                  
    create_dirs (no);                                                                                                               
    keep_hostname (no);                                                                                                             
};

source s_file {
    file("path to my-file.txt" flags(no-parse));
}


destination loghost {
    tcp("*.*.*.*" port(5140));
} 

потребляя

nc -k -l 5140 | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

Объяснение (от man nc)

-k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.

-l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.

Ссылка

Syslog-ng

Ответ 4

echo "Hello" | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic