2014-09-08 14 views
5

Geçtiğimiz günlerde Camel'in Kafka için kendi bileşenine sahip olduğunu fark ettim, bu yüzden ona bir girdap vermeye karar verdim. Camel Kafka Entegrasyonu

Ben güzel basit dosyayı denemeye karar verdi - ... En fazla bu bu çalıştıran, ancak, yeterince basit görünüyor

<route> 
     <from uri="file:///tmp/input" /> 
     <setHeader headerName="kafka.PARTITION_KEY"> 
      <constant>Test</constant> 
     </setHeader> 
     <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1" /> 
</route> 

... aşağıdaki gibi> kafka konuyu

java.lang.ClassCastException: java.lang.String cannot be cast to [B 
    at kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) 
    at org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:78) 

ve Deve kod kontrolü, bir Açıkçası

String msg = exchange.getIn().getBody(String.class); 
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, partitionKey.toString(), msg); 
producer.send(data); 

, bu bir seri hale sorundur ... aşağıdaki yok, ben sadece bir geçici çözüm olup olmadığından emin değilsiniz veya bu, mevcut uygulamada doğal olarak bir hata mıdır? (Ya da sadece benim yanlış anlamadığım)

Herhangi bir öneriniz var mı? Teşekkür, Teşekkürler,

cevap

10

Ah, burada gideceğimize aldırmayın ... Umarım bu bir başkasına yardımcı olur, serileştiriciyi seçenekler arasında ayarlamalısınız.

<route> 
      <from uri="file:///tmp/input" /> 
      <setHeader headerName="kafka.PARTITION_KEY"> 
       <constant>Test</constant> 
      </setHeader> 
      <to uri="kafka:localhost:9092?topic=test&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;groupId=group1&amp;serializerClass=kafka.serializer.StringEncoder" /> 
</route> 
0

yükleme ve Apache Kafka başlayan ve Kafka mesajı göndermek için bir deve bitiş noktası yapılandırmak için güzel bir örnek Bulundu Reference- Apache Camel + Kafka Integration example

@Override 
    public void configure() throws Exception { 

     String topicName = "topic=javainuse-topic"; 
     String kafkaServer = "kafka:localhost:9092"; 
     String zooKeeperHost = "zookeeperHost=localhost&zookeeperPort=2181"; 
     String serializerClass = "serializerClass=kafka.serializer.StringEncoder"; 

     String toKafka = new StringBuilder().append(kafkaServer).append("?").append(topicName).append("&") 
       .append(zooKeeperHost).append("&").append(serializerClass).toString(); 

     from("file:C:/inbox?noop=true").split().tokenize("\n").to(toKafka); 
    } 

topic-