2016-07-18 23 views
8

Java kullanarak kafka tüketici yazıyorum. Mesajın gerçek zamanını tutmak istiyorum, bu yüzden tüketmek için bekleyen çok fazla mesaj varsa, 1000 veya daha fazlası gibi, tükenmemiş mesajları terk etmeli ve en son mahsupdan tüketmeye başlamalıyım.Bir kafka konusundaki LATEST ofsetini nasıl alabilirim?

Bu sorun için, son yapılan ofset ve bir konunun en son ofsetini karşılaştırmaya çalışıyorum (yalnızca 1 bölüm), eğer bu iki ofset arasındaki fark belirli bir miktardan büyükse, en son ofset değerini ayarlayacağım. Bir sonraki ofset olarak konu bu gereksiz mesajları bırakabilirim.

Şimdi benim problemim, bir konunun en yeni ofsetini elde etmektir, bazı insanlar eski tüketiciyi kullanabileceğimi söylüyor, ama çok karmaşık, yeni tüketici bu işleve sahip mi?

cevap

12

Yeni tüketici de karmaşıktır.

//assign the topic consumer.assign();

Kafka sürümü için

//seek to end of the topic consumer.seekToEnd();

//the position is the latest offset consumer.position();

+0

İstemcinizdeki güncel ofset ve en son bilinen kafka konu ofset arasındaki farkı hesaplamak istiyorsanız bu işe yaramaz! – hiaclibe

5

: pasajı Üstü 0.10.1.1

// Get the diff of current position and latest offset 
Set<TopicPartition> partitions = new HashSet<TopicPartition>(); 
TopicPartition actualTopicPartition = new TopicPartition(record.topic(), record.partition()); 
partitions.add(actualTopicPartition); 
Long actualEndOffset = this.consumer.endOffsets(partitions).get(actualTopicPartition); 
long actualPosition = consumer.position(actualTopicPartition);   
System.out.println(String.format("diff: %s (actualEndOffset:%s; actualPosition=%s)", actualEndOffset -actualPosition ,actualEndOffset, actualPosition)); 
0
KafkaConsumer<String, String> consumer = ... 
consumer.subscribe(Collections.singletonList(topic)); 
TopicPartition topicPartition = new TopicPartition(topic, partition); 
consumer.poll(0); 
consumer.seekToEnd(Collections.singletonList(topicPartition)); 
long currentOffset = consumer.position(topicPartition) -1; 

belirli bir konu ve bölüm için ofset akım kararlı mesajı döndürür numara.