SparkStreaming, RabbitMQ и MQTT в питоне с использованием pika

Просто чтобы все было сложно, я хотел бы получать сообщения из очереди rabbitMQ. Теперь я знаю, что есть плагин для MQTT на кролике (https://www.rabbitmq.com/mqtt.html).

Однако я не могу представить пример работы, где Spark потребляет сообщение, которое было создано из pika.

Например, я использую простую программу wordcount.py здесь (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html), чтобы увидеть, могу ли я увидеть сообщение производитель следующим образом:

import sys
import pika
import json
import future
import pprofile

def sendJson(json):

  connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  channel = connection.channel()

  channel.queue_declare(queue='analytics', durable=True)
  channel.queue_bind(exchange='analytics_exchange',
                       queue='analytics')

  channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
  connection.close()

if __name__ == "__main__":
  with open(sys.argv[1],'r') as json_file:
    sendJson(json_file.read())

Поток потребителя выглядит следующим образом:

import sys
import operator

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")


#RabbitMQ

"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

Однако, в отличие от простого примера wordcount, я не могу заставить это работать и получить следующую ошибку:

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)

Итак, мои вопросы: каковы должны быть настройки в терминах MQTTUtils.createStream(ssc, brokerUrl, topic) для прослушивания в очереди и есть ли более полные примеры и как они сопоставляются с параметрами rabbitMQ.

Я запускаю свой потребительский код с помощью: ./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

Я обновил код производителя следующим образом с параметрами TCP, как предложено одним комментарием:

url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)

и искрообразование, как:

brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()

Ответы

Ответ 1

Похоже, вы используете неправильный номер порта. Предполагая, что:

  • У вас есть локальный экземпляр RabbitMQ с настройками по умолчанию, и вы включили плагин MQTT (rabbitmq-plugins enable rabbitmq_mqtt) и перезапустили сервер RabbitMQ
  • включен spark-streaming-mqtt при выполнении spark-submit/pyspark (либо с packages, либо jars/driver-class-path)

вы можете подключиться с помощью TCP с помощью tcp://localhost:1883. Вы также должны помнить, что MQTT использует amq.topic.

Быстрый старт:

  • создайте Dockerfile со следующим содержимым:

    FROM rabbitmq:3-management
    
    RUN rabbitmq-plugins enable rabbitmq_mqtt
    
  • создать изображение Docker:

    docker build -t rabbit_mqtt .
    
  • запустите изображение и дождитесь готовности сервера:

    docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
    
  • создайте producer.py со следующим содержимым:

    import pika
    import time 
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='amq.topic',
                     type='topic', durable=True)
    
    for i in range(1000):
        channel.basic_publish(
            exchange='amq.topic',  # amq.topic as exchange
            routing_key='hello',   # Routing key used by producer
            body='Hello World {0}'.format(i)
        )
        time.sleep(3)
    
    connection.close()
    
  • начать продюсер

    python producer.py
    

    и посетите консоль управления http://127.0.0.1:15672/#/exchanges/%2F/amq.topic

    чтобы увидеть, что сообщения получены.

  • создайте consumer.py со следующим содержимым:

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.mqtt import MQTTUtils
    
    sc = SparkContext()
    ssc = StreamingContext(sc, 10)
    
    mqttStream = MQTTUtils.createStream(
        ssc, 
        "tcp://localhost:1883",  # Note both port number and protocol
        "hello"                  # The same routing key as used by producer
    )
    mqttStream.count().pprint()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    
  • загружать зависимости (скорректируйте версию Scala на версию, используемую для создания версии Spark и Spark):

    mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
    
  • убедитесь, что SPARK_HOME и PYTHONPATH указывают на правильные каталоги.

  • отправить consumer.py с помощью (изменить версии, как раньше):

    spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
    

Если вы выполнили все шаги, вы должны увидеть приветственные сообщения Hello в журнале Spark.

Ответ 2

В MqttAsyncClient Javadoc URI сервера должен иметь одну из следующих схем: tcp://, ssl:// или local://. Вы должны изменить свой brokerUrl выше, чтобы иметь одну из этих схем.

Для получения дополнительной информации здесь ссылка на источник для MqttAsyncClient:

https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java#L272