2015-05-25 21 views
13

Websphere MQ'nin, kıvılcım akışına yönelik bir veri kaynağı olarak, kullanım durumumuzun birisinde ihtiyaç duyulan olasılıkları araştırıyordum. MQTT'un MQ veri yapıları arasındaki iletişimi destekleyen bir protokol olduğunu bilmeliyim, ancak akış kıvılcımına yeni başlayan bir kullanıcı olduğum için aynı çalışma örneklerine ihtiyacım var. MQ, kıvılcım akışı ile MQ'yi bağlamaya çalıştı. Lütfen bunu yapmak için en iyi yolu tasarlayın.Websphere MQ, Apache Spark Streaming için bir veri kaynağı olarak

+1

Oy olarak kapatmak için Konu dışı Overflow'un soru yönergelerine uymadığından konu dışı. Bu geniş mimariye ve fizibilite sorularına http://mqseries.net veya diğer çevrimiçi MQ forumlarından birini sormanızı tavsiye ederim. –

+0

Sadece bir ifade problemi olabileceğini düşünüyorum. Belirsizlik yerine "Bu şeye bakıyordum. En iyi çözüm nedir?" _ Doğrudan bir soru sorabilirsiniz. _ "Websphere MQ'dan Apache Spark ile verileri nasıl okuyabilirim?" _ Websphere MQ tarafı hakkında daha fazla bilgiye sahipseniz, bu konuda daha fazla bilgi ekleyebilirsiniz. SQL'i destekliyor mu? Normalde nasıl sorgularsınız? Hangi müşteriler var? O zaman Spark'i tanıyan biri sana yardım edebilir. –

cevap

3

Yani, burada Websphere MQ bağlanır ve okur CustomMQReceiver için çalışan kod ilanıyla veriler: Sana WebSphere MQ bağlamak bağlanmak için JMS kullanabilirsiniz inanıyoruz ve Apache Camel kullanılabilir

public class CustomMQReciever extends Receiver<String> { String host = null; 
int port = -1; 
String qm=null; 
String qn=null; 
String channel=null; 
transient Gson gson=new Gson(); 
transient MQQueueConnection qCon= null; 

Enumeration enumeration =null; 

public CustomMQReciever(String host , int port, String qm, String channel, String qn) { 
    super(StorageLevel.MEMORY_ONLY_2()); 
    this.host = host; 
    this.port = port; 
    this.qm=qm; 
    this.qn=qn; 
    this.channel=channel; 

} 

public void onStart() { 
    // Start the thread that receives data over a connection 
    new Thread() { 
     @Override public void run() { 
      try { 
       initConnection(); 
       receive(); 
      } 
      catch (JMSException ex) 
      { 
       ex.printStackTrace(); 
      } 
     } 
    }.start(); 
} 
public void onStop() { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself isStopped() returns false 
} 

/** Create a MQ connection and receive data until receiver is stopped */ 
private void receive() { 
    System.out.print("Started receiving messages from MQ"); 

    try { 

    JMSMessage receivedMessage= null; 

     while (!isStopped() && enumeration.hasMoreElements()) 
     { 

      receivedMessage= (JMSMessage) enumeration.nextElement(); 
      String userInput = convertStreamToString(receivedMessage); 
      //System.out.println("Received data :'" + userInput + "'"); 
      store(userInput); 
     } 

     // Restart in an attempt to connect again when server is active again 
     //restart("Trying to connect again"); 

     stop("No More Messages To read !"); 
     qCon.close(); 
     System.out.println("Queue Connection is Closed"); 

    } 
    catch(Exception e) 
    { 
     e.printStackTrace(); 
     restart("Trying to connect again"); 
    } 
    catch(Throwable t) { 
     // restart if there is any other error 
     restart("Error receiving data", t); 
    } 
    } 

    public void initConnection() throws JMSException 
{ 
    MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory(); 
    conFactory.setHostName(host); 
    conFactory.setPort(port); 
    conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); 
    conFactory.setQueueManager(qm); 
    conFactory.setChannel(channel); 


    qCon= (MQQueueConnection) conFactory.createQueueConnection(); 
    MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1); 
    MQQueue queue=(MQQueue) qSession.createQueue(qn); 
    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue); 
    qCon.start(); 

    enumeration= browser.getEnumeration(); 
    } 

@Override 
public StorageLevel storageLevel() { 
    return StorageLevel.MEMORY_ONLY_2(); 
} 
} 
1

Websphere MQ'ye bağlanmak için böylece (bu kalıp ayrıca JMS olmadan kullanılabileceğini unutmayın) gibi Özel bir Alıcı oluşturabilirsiniz:

class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable { 
    //Transient as this will get passed to the Workers from the Driver 
    @transient 
    var camelContextOption: Option[DefaultCamelContext] = None 

    def onStart() = { 
    camelContextOption = Some(new DefaultCamelContext()) 
    val camelContext = camelContextOption.get 
    val env = new Properties() 
    env.setProperty("java.naming.factory.initial", "???") 
    env.setProperty("java.naming.provider.url", jndiProviderURL) 
    env.setProperty("com.webmethods.jms.clientIDSharing", "true") 
    val namingContext = new InitialContext(env); //using the properties file to create context 

    //Lookup Connection Factory 
    val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory] 
    camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)) 

    val builder = new RouteBuilder() { 
     def configure() = { 
      from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10") 
      .process(new Processor() { 
      def process(exchange: Exchange) = { 
       exchange.getIn.getBody match { 
       case s: String => store(s) 
       } 
      } 
      }) 
     } 
     } 
    } 
    builders.foreach(camelContext.addRoutes) 
    camelContext.start() 
    } 

    def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop() 
} 

Ardından bu yüzden sevdiği olayların bir DStream oluşturabilirsiniz:

val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))