2016-04-18 36 views
7

Kafka 0.0.8V ile Apache avro şemasını kullanır. Ben üretici/tüketici uçlarında aynı şemayı kullanın. Şemada NO HERHANGİ Değişiklikler var. Ama ben mesajları tüketmeye çalıştığımda tüketiciye bir istisna getiriyorum. Bu hatayı neden aldım?Avro kod çözme, java.io.EOFException

Üretici

public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException { 
    BinaryEncoder encoder = null; 
    ByteArrayOutputStream out = null; 
    try { 
     DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema); 
     out = new ByteArrayOutputStream(); 
     encoder = EncoderFactory.get().binaryEncoder(out, null); 
     writer.write(payload, encoder); 
     encoder.flush(); 

     byte[] serializedBytes = out.toByteArray(); 

     KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes); 

      producer.send(message); 
     } 

Tüketici

public void run() { 
     try { 
      ConsumerIterator<byte[], byte[]> itr = stream.iterator(); 
      while (itr.hasNext()) { 

       byte[] data = itr.next().message(); 

       Schema schema = new Schema.Parser() 
         .parse(new File("/Users/xx/avro_schemas/file.avsc")); 

       DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
       Decoder decoder = DecoderFactory.get().binaryDecoder(data, null); 

       GenericRecord payload = reader.read(null, decoder); 
       System.out.println("Message received --: " + payload); 

Ama okuyucu dekoderi bir mesajın okunması çalıştığımda istisna aşağıdaki almak .;

java.io.EOFException 
    at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473) 
    at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128) 
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259) 
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363) 
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157) 
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) 
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) 
    at com.xx.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:55) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Tüketici özellikleri

enable.auto.commit=true 
auto.commit.interval.ms=101 
session.timeout.ms=7000 
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 
zookeeper.connect=zookeeper.xx.com\:2181 
heartbeat.interval.ms=1000 
auto.offset.reset=smallest 
serializer.class=kafka.serializer.DefaultEncoder 
bootstrap.servers=kafka.xx.com\:9092 
group.id=test 
consumer.timeout.ms=-1 
fetch.min.bytes=1 
receive.buffer.bytes=262144 

cevap

1

Sorun AVRO yapımcı tarafından üretilir.

SendFile() yönteminde, kodlayıcıyı temizlemiyorsanız ve EOFException'a neden olan ByteArrayOutputStream() yöntemini kapatmıyorsunuz demektir.

public class TestSerializer<T> { 



    final private Class<T> avroType; 

    public TestSerializer(Class<T> avroType) { 
     this.avroType = avroType; 
    } 

    public byte[] serialize(T object) 
    { 
     ByteArrayOutputStream out = new ByteArrayOutputStream(); 
     BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
     DatumWriter<T> writer = new SpecificDatumWriter<T>(avroType); 
     try 
     { 
      writer.write(object, encoder); 
      out.close(); 
     } catch (IOException e) 
     { 
      throw new RuntimeException(e); 
     } finally 
     { 
      //Here is the flushing and closing 
      try 
      { 
       if (encoder != null) 
       { 
        encoder.flush(); 
       } 
       if (out != null) 
       { 
        out.close(); 
       } 
      } catch (IOException e) 
      { 
       throw new RuntimeException(e); 
      } 
     } 

     return out.toByteArray(); 

    } 

} 
: Burada

bir jenerik seri sınıfının bir örneği var