2015-10-17 26 views
7

Kafka Queue'dan seri hale getirilmiş avro olayları tüketmeye çalışıyorum. Kafka kuyruğu, basit bir java üreticisi kullanılarak doldurulur.Kafka ile Logstash: avro için çözülemiyor

Avro şema dosyası

{"namespace": "example.avro", 
"type": "record", 
"name": "User", 
"fields": [ 
    {"name": "name", "type": "string"}, 
    {"name": "favorite_number", "type": ["int", "null"]}, 
    {"name": "favorite_color", "type": ["string", "null"]} 
] 
} 

Java Üretici kod parçacığı (User.class kullanılarak üretilir avro-araçları)

User user1 = new User(); 
    user1.setName("Alyssa"); 
    user1.setFavoriteNumber(256); 
    user1.setFavoriteColor("blue"); 
    String topic = "MemoryTest"; 

    // Properties set in 'props' 
    KafkaProducer<Message, byte[]> producer = new KafkaProducer<Message, byte[]>(props); 

    ByteArrayOutputStream out = new ByteArrayOutputStream(); 
    DatumWriter<User> writer = new SpecificDatumWriter<User>(User.class); 
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
    writer.write(user1, encoder); 
    encoder.flush(); 
    out.close(); 
    byte[] serializedBytes = out.toByteArray(); 
    producer.send(new ProducerRecord<Message, byte[]>(topic, serializedBytes)); 

: Anlaşılır olması için ben üç bileşeni paylaşıyorum Logstash Yapılandırma dosyası

input { 
     kafka { 
       zk_connect => "localhost:2181" 
       topic_id => "MemoryTest" 
       type => "standard_event" 
       group_id => "butiline_dash_prod" 
     reset_beginning => true 
     auto_offset_reset => smallest 
     codec => { 
       avro => { 
        schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
       } 
      } 
     } 
} 

output { 
    stdout { 
    codec => rubydebug 
    } 
} 

Sorun

boru hattı logstash düzeyinde başarısız olur. Yeni bir olay Kafka'ya aktarıldığında, logstash konsolunu takip ediyorum:

Alyssa�blue {:exception=>#<NoMethodError: undefined method `decode' for ["avro", {"schema_uri"=>"/opt/ELK/logstash-1.5.4/bin/user.avsc"}]:Array>, :backtrace=>["/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error} 

cevap

10

Sonunda hatayı anladım. Ben sözdizimi değiştirilir tahmin

codec => avro { 
     schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
} 

: - Yerine (Logstash web sitesinde önerildiği gibi https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html) Bunun (eklentilerin sayfalarına https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md önerildiği üzere)

codec => { 
    avro => { 
     schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
    } 
} 

doğru sözdizimi olduğunu.