Sadece işleri zorlaştırmak için, rabbitMQ kuyruğundan mesaj almak istiyorum. Şimdi biliyorum tavşan üzerinde MQTT için bir eklenti var (https://www.rabbitmq.com/mqtt.html). Bununla birlikte Spark'in pika'dan üretilen bir mesajı tükettiği bir örnek çalışma yapamıyorum gibi görünebilir.SparkStreaming, pika kullanılarak python'da RabbitMQ ve MQTT
import sys
import pika
import json
import future
import pprofile
def sendJson(json):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='analytics', durable=True)
channel.queue_bind(exchange='analytics_exchange',
queue='analytics')
channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
connection.close()
if __name__ == "__main__":
with open(sys.argv[1],'r') as json_file:
sendJson(json_file.read())
sparkstreaming tüketici edilir:
Mesela ben şu şekilde bir mesaj yapımcı görebilirsiniz olmadığını görmek için buraya basit wordcount.py programını (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) kullanıyorum aşağıdaki: Ancak basit wordcount örnek aksine
import sys
import operator
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")
#RabbitMQ
"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""
brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
, ben bu işe ve şu hatayı almaya alınamıyor:
16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)
Yani benim sorular sıraya dinlemek için MQTTUtils.createStream(ssc, brokerUrl, topic)
açısından ayarları ne olması gerektiğini, ve orada bir daha dolgun örneklerdir olup olmadığını nasıl RabbitMQ olanlar üzerine bu harita.
Ben ile benim tüketici kod çalıştırıyorum:
url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
ve kıvılcım olarak akış:
bir yorum tarafından önerildiği gibi, TCP parametreleri ile aşağıdaki gibi ben yapımcı kodu güncelledik./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py
brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages
mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()
Teşekkürler. Bir bakacağım. Bu konuyla doğrudan ve iyi çalışabilir mi? – disruptive
MQTT eklentisi [yapılandırılabilir] (https://www.rabbitmq.com/mqtt.html#config) farklı bir değişim kullanmak için ancak bunu anlayabildiğim kadarıyla. MQTT protokolü bundan çok daha zengin değildir. – zero323
Bunu docker olmadan yapılandırmanın bir yolu var - örneğin .config dosyasını kullanarak. Https://www.rabbitmq.com/mqtt.html adresindeki varsayılan ayarlarla denedim. Ama bu hiç işe yaramıyor. Ayarsız olarak, kıvılcım dinleyici aşağıdakilerle bağlantı kurabilir: = BİLGİ RAPORU ==== 5-Tem-2016 :: 11: 52: 08 === MQTT bağlantısını kabul etme <0.321.0> (127.0.0.1:47868 -> 127.0. 0.1: 1883). Ama üretilen mesajlar bu limana nasıl haritalanır? – disruptive