Kafka 0.0.8V ile Apache avro şemasını kullanır. Ben üretici/tüketici uçlarında aynı şemayı kullanın. Şemada NO HERHANGİ Değişiklikler var. Ama ben mesajları tüketmeye çalıştığımda tüketiciye bir istisna getiriyorum. Bu hatayı neden aldım?Avro kod çözme, java.io.EOFException
Üretici
public void sendFile(String topic, GenericRecord payload, Schema schema) throws CoreException, IOException {
BinaryEncoder encoder = null;
ByteArrayOutputStream out = null;
try {
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema);
out = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(payload, encoder);
encoder.flush();
byte[] serializedBytes = out.toByteArray();
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);
producer.send(message);
}
Tüketici
public void run() {
try {
ConsumerIterator<byte[], byte[]> itr = stream.iterator();
while (itr.hasNext()) {
byte[] data = itr.next().message();
Schema schema = new Schema.Parser()
.parse(new File("/Users/xx/avro_schemas/file.avsc"));
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
GenericRecord payload = reader.read(null, decoder);
System.out.println("Message received --: " + payload);
Ama okuyucu dekoderi bir mesajın okunması çalıştığımda istisna aşağıdaki almak .;
java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:259)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at com.xx.KafkaMessageListenerThread.run(KafkaMessageListenerThread.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Tüketici özellikleri
enable.auto.commit=true
auto.commit.interval.ms=101
session.timeout.ms=7000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.connect=zookeeper.xx.com\:2181
heartbeat.interval.ms=1000
auto.offset.reset=smallest
serializer.class=kafka.serializer.DefaultEncoder
bootstrap.servers=kafka.xx.com\:9092
group.id=test
consumer.timeout.ms=-1
fetch.min.bytes=1
receive.buffer.bytes=262144