2016-12-07 15 views
5

KafkaConsumer 0.10 Java API'yi kullanıyorum. Belirli bir bölümden ve belirli bir ofsetten tüketmek istiyorum. Yukarı baktım ve bir arama yöntemi olduğunu ancak bunun bir istisna attığını keşfettim. Benzer bir kullanım durumu veya çözümü olan var mı?KafkaConsumer 0.10 Java API hata iletisi: Bölüm için geçerli atama yok

Kodu:

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps); 
consumer.seek(new TopicPartition("mytopic", 1), 4); 

İstisna

java.lang.IllegalStateException: No current assignment for partition mytopic-1 
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) 
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) 
    at xx.xxx.xxx.Test.main(Test.java:182) 

cevap

18

önce can seek() Bir konu veya tüketiciye bir konununassign() bölüm için subscribe() için ihtiyacınız. Ayrıca, subscribe() ve assign()'un tembel olduğunu da unutmayın; bu nedenle, seek()'u kullanmadan önce poll()'a "kukla çağrı" yapmanız da gerekir. Eğer subscribe() kullanırsanız

, grup yönetimini kullanın: Böylece, aynı group.id kullanarak birden tüketicileri başlayabilir ve konunun tüm bölümleri her bölüm bir tahsis alacak (otomatik grup içindeki tüm tüketiciler boyunca eşit atanacaktır gruptaki tek tüketici).

Belirli bölümleri okumak isterseniz, assign() aracılığıyla manuel atamayı kullanmanız gerekir. Bu, istediğiniz herhangi bir ödevi yapmanıza olanak tanır.

Btw: KafkaConsumer, örnekler de dahil olmak üzere çok uzun bir JavaDoc sınıfına sahiptir. Bunu okumak için değer.

+0

Teşekkürler. Çalıştı :)() ve seek() – colossal

+1

kombinasyonu ile çalıştım. "Application.id" yerine "group.id" demek istediniz. – automaticgiant

+0

Burada çok fazla #KafkaStream sorusunu yanıtlıyor ... @automaticgiant –

1

poll() kullanmak ve harita kayıtlarını almak ve ofsetin kendisini değiştirmek istemiyorsanız. Kafka sürüm 0.11 bu deneyin: koordinatör olaylar için

... 
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);  
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2")); 
List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList()); 
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); 
coordinatorField.setAccessible(true);  

ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer); 
coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings 
consumer.seekToBeginning(partitions); //or other seek 

anket. Bu, koordinatörün bilinmesini ve tüketicinin gruba katılmasını sağlar (grup yönetimi kullanıyorsa). Bu, etkinleştirildikleri takdirde periyodik offset işlemlerini de gerçekleştirir.