2016-06-16 53 views
14

Sadece işleri zorlaştırmak için, rabbitMQ kuyruğundan mesaj almak istiyorum. Şimdi biliyorum tavşan üzerinde MQTT için bir eklenti var (https://www.rabbitmq.com/mqtt.html). Bununla birlikte Spark'in pika'dan üretilen bir mesajı tükettiği bir örnek çalışma yapamıyorum gibi görünebilir.SparkStreaming, pika kullanılarak python'da RabbitMQ ve MQTT

import sys 
import pika 
import json 
import future 
import pprofile 

def sendJson(json): 

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
    channel = connection.channel() 

    channel.queue_declare(queue='analytics', durable=True) 
    channel.queue_bind(exchange='analytics_exchange', 
         queue='analytics') 

    channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json) 
    connection.close() 

if __name__ == "__main__": 
    with open(sys.argv[1],'r') as json_file: 
    sendJson(json_file.read()) 

sparkstreaming tüketici edilir:

Mesela ben şu şekilde bir mesaj yapımcı görebilirsiniz olmadığını görmek için buraya basit wordcount.py programını (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) kullanıyorum aşağıdaki: Ancak basit wordcount örnek aksine

import sys 
import operator 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.mqtt import MQTTUtils 

sc = SparkContext(appName="SS") 
sc.setLogLevel("ERROR") 
ssc = StreamingContext(sc, 1) 
ssc.checkpoint("checkpoint") 
#ssc.setLogLevel("ERROR") 


#RabbitMQ 

"""EXCHANGE = 'analytics_exchange' 
EXCHANGE_TYPE = 'direct' 
QUEUE = 'analytics' 
ROUTING_KEY = 'analytics' 
RESPONSE_ROUTING_KEY = 'analytics-response' 
""" 


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883" 
topic = "analytics" 

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic) 
#dummy functions - nothing interesting... 
words = mqttStream.flatMap(lambda line: line.split(" ")) 
pairs = words.map(lambda word: (word, 1)) 
wordCounts = pairs.reduceByKey(lambda x, y: x + y) 

wordCounts.pprint() 
ssc.start() 
ssc.awaitTermination() 

, ben bu işe ve şu hatayı almaya alınamıyor:

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8) 
java.lang.NullPointerException 
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457) 
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273) 

Yani benim sorular sıraya dinlemek için MQTTUtils.createStream(ssc, brokerUrl, topic) açısından ayarları ne olması gerektiğini, ve orada bir daha dolgun örneklerdir olup olmadığını nasıl RabbitMQ olanlar üzerine bu harita.

Ben ile benim tüketici kod çalıştırıyorum:

url_location = 'tcp://localhost' 
url = os.environ.get('', url_location) 
params = pika.URLParameters(url) 
connection = pika.BlockingConnection(params) 

ve kıvılcım olarak akış:

bir yorum tarafından önerildiği gibi, TCP parametreleri ile aşağıdaki gibi ben yapımcı kodu güncelledik ./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

brokerUrl = "tcp://127.0.0.1:5672" 
topic = "#" #all messages 

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic) 
records = mqttStream.flatMap(lambda line: json.loads(line)) 
count = records.map(lambda rec: len(rec)) 
total = count.reduce(lambda a, b: a + b) 
total.pprint() 

cevap

2

görünüyor. varsayarsak:

  • Eğer RabbitMQ varsayılan ayarlarla çalışan yerel bir örneği olması ve (ya packages veya jars ile spark-submit/pyspark çalıştırırken
  • spark-streaming-mqtt dahil RabbitMQ sunucusunu MQTT eklentisi (rabbitmq-plugins enable rabbitmq_mqtt) etkin ve yeniden ettik

tcp://localhost:1883 ile TCP'yi kullanarak bağlanabilirsiniz. Ayrıca MQTT'nin amq.topic kullandığını da unutmamalısınız.

Hızlı başlangıç ​​:

  • şu içeriğe sahip Dockerfile oluşturun:

    FROM rabbitmq:3-management 
    
    RUN rabbitmq-plugins enable rabbitmq_mqtt 
    
  • Docker imaj oluşturmak:

    docker build -t rabbit_mqtt . 
    
  • başlangıç ​​görüntüsünü ve sunucu rea kadar bekleyin dy:

    docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
    
  • şu içeriğe sahip producer.py oluşturun:

    import pika 
    import time 
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost')) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='amq.topic', 
           type='topic', durable=True) 
    
    for i in range(1000): 
        channel.basic_publish(
         exchange='amq.topic', # amq.topic as exchange 
         routing_key='hello', # Routing key used by producer 
         body='Hello World {0}'.format(i) 
        ) 
        time.sleep(3) 
    
    connection.close() 
    
  • başlangıç ​​yapımcı

    python producer.py 
    

    ve ziyaret yönetim konsolu http://127.0.0.1:15672/#/exchanges/%2F/amq.topic

    mesajları almasını görmek için.

  • şu içeriğe sahip consumer.py oluşturun:

    from pyspark import SparkContext 
    from pyspark.streaming import StreamingContext 
    from pyspark.streaming.mqtt import MQTTUtils 
    
    sc = SparkContext() 
    ssc = StreamingContext(sc, 10) 
    
    mqttStream = MQTTUtils.createStream(
        ssc, 
        "tcp://localhost:1883", # Note both port number and protocol 
        "hello"     # The same routing key as used by producer 
    ) 
    mqttStream.count().pprint() 
    ssc.start() 
    ssc.awaitTermination() 
    ssc.stop() 
    
  • indirme bağımlılıkları (Spark ve Spark versiyonunu oluşturmak için kullanılan birine Scala sürümünü ayarlayın):

    mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 
    
  • emin olun SPARK_HOME ve PYTHONPATH Doğru dizinleri işaret edin.

  • (önceki gibi versiyonlarını ayarlayın) ile consumer.py gönderin:

    spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py 
    

tüm adımları size Kıvılcım günlüğüne Merhaba dünya mesajları görmelisiniz takip edin.

+0

Teşekkürler. Bir bakacağım. Bu konuyla doğrudan ve iyi çalışabilir mi? – disruptive

+0

MQTT eklentisi [yapılandırılabilir] (https://www.rabbitmq.com/mqtt.html#config) farklı bir değişim kullanmak için ancak bunu anlayabildiğim kadarıyla. MQTT protokolü bundan çok daha zengin değildir. – zero323

+0

Bunu docker olmadan yapılandırmanın bir yolu var - örneğin .config dosyasını kullanarak. Https://www.rabbitmq.com/mqtt.html adresindeki varsayılan ayarlarla denedim. Ama bu hiç işe yaramıyor. Ayarsız olarak, kıvılcım dinleyici aşağıdakilerle bağlantı kurabilir: = BİLGİ RAPORU ==== 5-Tem-2016 :: 11: 52: 08 === MQTT bağlantısını kabul etme <0.321.0> (127.0.0.1:47868 -> 127.0. 0.1: 1883). Ama üretilen mesajlar bu limana nasıl haritalanır? – disruptive

2

MqttAsyncClient Javadoc'tan, sunucu URI'sının aşağıdaki şemalardan birine sahip olması gerekir: tcp://, ssl:// veya local://. Bu şemalardan birine sahip olmak için yukarıdaki brokerUrl numaralı telefonu değiştirmeniz gerekir.

Daha fazla bilgi için buraya MqttAsyncClient için kaynağına bir bağlantı: Yanlış port numarasını kullanarak gibi

https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java#L272

+1

Üreticiyi http yerine tcp kullanacak şekilde değiştirmeyi denedim, ancak şu anda aşağıdaki bağlantı sorununu buldum: ERROR ReceiverSupervisorImpl: Hatalı alıcıyı durdurdu: Bağlantı kesildi (32109) - java.net.SocketException: Bağlantı sıfırlama – disruptive