2016-04-12 35 views
4

kafka-python-1.0.2 Kullanımı.kafka-python - Bir bölümü nasıl işlerim?

10 bölümlü bir konuğum varsa, çeşitli bölümler ve iletiler arasında döngü yaparken belirli bir bölümün işlenmesiyle ilgili olarak nasıl giderim. Bn sadece docs, bu her yerde bir örneğini bulmak görünmek veya başka

Dokümanlar

, kullanmak istediğim:

consumer.commit(offset=offsets) Özellikle

, ben bölümü ve OffsetAndMetadata sözlüğünü oluşturmak nasıl gerekli ofsetler için (dict, isteğe bağlı) - {TopicPartition: OffsetAndMetadata}.

ben işlev çağrısı gibi bir şey olacağını umuyordum:

consumer.commit(partition, offset)

ancak bu durum görünmüyor.

Şimdiden teşekkürler.

cevap

3

Anladığım kadarıyla, sanırım sorularınızı yazarken bu nasıl oluyor da komik. Bu işe yarıyor:

meta = consumer.partitions_for_topic(topic) 
options = {} 
options[partition] = OffsetAndMetadata(message.offset, meta) 
consumer.commit(options) 

Daha fazla sınama gerekli, ancak bir şey değişirse güncellenir.

+0

Bundan sonra bir sorun mu var? Ben de aynısını yapmak istiyorum. –

+1

Bunu yapmanın yolu budur, GitLab'deki kafka ekibine ulaştım. Yanıt: "metadata gerçekten sadece opak bir dizedir. Ayrıca hiçbirini geçemezsiniz. Hiçbir şey meta verileri dahili olarak kullanır, gerekirse uygulamaya özgü verileri depolamanız için bir yoldur. Ancak çok az kişi bu işlevi kullanır. Bu yola giderseniz dikkatli olun –

+0

Bu thread için link: https://github.com/dpkp/kafka-python/issues/645 –

2

Meta verileri kullanmak gerekmez. Bu yardımcı olur

from kafka import TopicPartition 
from kafka.structs import OffsetAndMetadata 
... 
topic = 'your_topic' 
partition = 0 
tp = TopicPartition(topic,partition) 
kafkaConsumer = createKafkaConsumer() 
kafkaConsumer.assign([tp]) 
offset = 15394125 
kafkaConsumer.commit({ 
    tp: OffsetAndMetadata(offset, None) 
}) 

Hope: bu örneği bakın. bunlardan her biri için bir tane atamanız gerekir ve daha sonra ofset seti, daha o bir bölüm varsa

1
from kafka import KafkaConsumer 
from kafka import TopicPartition 

TOPIC = "test_topic" 
PARTITION = 0 

consumer = KafkaConsumer(
    group_id=TOPIC, 
    auto_offset_reset="earliest", 
    bootstrap_servers="localhost:9092", 
    request_timeout_ms=100000, 
    session_timeout_ms=99000, 
    max_poll_records=100, 
) 
topic_partition = TopicPartition(TOPIC, PARTITION) 
# format: topic, partition 
consumer.assign([topic_partition]) 
consumer.seek(topic_partition, 1660000) 
# format: TopicPartition, offset. 1660000 is the offset been set. 
for message in consumer: 
    # do something 
  1. Bu yalnızca, o bölüm için ofset bir bölüm ve setleri atar.
  2. aalmeida88'nin cevabı bazen benim için işe yarıyor, bazı durumlarda, işe yarıyor ve aalmeida88 bana fikir vermemi sağladı ve aynı zamanda kullanışlı bir yöntem gibi görünüyor.
  3. Dikkat etmeniz gereken başka bir şey, bölümleri kendiniz atadığınızda, kafka yöneticisinin tüketici bilgilerini alamadığı anlaşılıyor, bunun nedeni bölümler atadığınızda, bunun yerine kafo'ya zookeeper yerine ayarlamanızdır. kafka yöneticisi bu bilgiyi alamayabilir. Umut eder!

--- düzenlemek -----

bunu yapmak için daha iyi bir yol bulun.

topic_partition = TopicPartition(TOPIC, 
           message.partition) 
consumer.seek(topic_partition, offset_value) 
consumer.commit() 

Bu

dolayısıyla orada (nadir değildir) ofset birden fazla bölümleri olan programda ayarlanması gerektiğinde kolaylık getiriyor, Kafka alınan mesajdan bölüm bilgi ayıklamak ve manuel bölüm atamak maddesini kurtaracak.

ps: Bir bölümün yalnızca bir kez ayarlandığından emin olmak için uygulamanıza göre bir bayrak ayarlanmalıdır.