2016-11-24 25 views
7

Farklı kaynaklardan tam olarak aynı içeriği aktaran 2 Kafka konuğım var, bu yüzden kaynaklardan birinin başarısız olması durumunda yüksek kullanılabilirliğe sahip olabilirim. Kafka Streams 0.10.1.0 kullanarak 2 konuyu 1 çıktı konusunu birleştirmeye çalışıyorum, böylece hatalar ile ilgili hiçbir mesajı kaçırmıyorum ve tüm kaynaklar dolduğunda kopya yok.Birden fazla aynı Kafka Streams konularının birleştirilmesi

KStream'in leftJoin yöntemini kullanırken, konulardan biri sorun olmadan (ikincil konu) aşağı inebilir, ancak birincil konu azaldığında, çıktı konusuna hiçbir şey gönderilmez. Bu

KStream-KStream leftJoin daima birincil akımından gelen kayıtları tarafından tahrik edilmektedir, Kafka Streams developer guide göre, çünkü görünüyor

böylece

birincil akımından gelen hiçbir kayıt varsa, onu Mevcut olsalar bile ikincil akıştan kayıtları kullanmaz. Birincil akış tekrar çevrimiçi olduğunda, çıkış normal olarak devam eder.

Ben de

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1, 
    JoinWindows.of(2000L)) 

mergedStream.groupByKey() 
      .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore)) 
      .toStream((key,value) -> value) 
      .to(outputStream) 

yinelenenlerin kurtulmak için KTable ve groupByKey için dönüşümle ardından outerJoin (yinelenen kayıtları ekler olan) kullanarak denedim ama hala arada bir çiftleri olsun. KTable'ın çıkış akımına sık sık gönderilmesini sağlamak için commit.interval.ms=200 kullanıyorum.

Bu birleştirme işlemine, birden çok özdeş giriş konusundan tam olarak bir kez çıktı almak için en iyi yol ne olurdu?

+0

Genel olarak, sorunu çözmek için Processor API'sini öneririm. Ayrıca mevcut 'trunk' sürümüne geçmeyi de deneyebilirsiniz (emin olmanız sizin için mümkün değildir). Katılanlar yeniden işlediler ve bu da sorununuzu çözebilir: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics Yeni birleştirme semantiği, Kafka'da 0.10.2'ye dahil edilecektir. Ocak 2017 hedef sürümüne sahip (https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan). –

+0

@ MatthiasJ.Sax Bagaja geçtim ve "leftJoin" şimdi KStream-KStream katılımları için bir 'outerJoin' gibi davranıyor gibi görünüyor, bu yüzden 10.1 semantiğine geri döneceğim. Şimdi denediğim şey, birincil olarak kullanılan bir solJoin'de birincil olarak kullanacağım boş değerleri çıkaran sahte bir akış oluşturmak ve bunu ikincil bir solJoin'de birleştirmek olan sahte bir akış oluşturmaktır. Umarım bu, birincil akışımda olsa bile, birincil akışta her zaman değerlere sahip olur (yalnızca ilk soldan boş bırakacağım gibi). –

+0

Yeni 'leftJoin', her iki taraftan da eski 'outerJoin '' in yaptığı gibi tetikliyor (sanırım şu anlama geliyor" solJoin şimdi bir dışJoin gibi davranıyor "gibi görünüyor?) - bu, eski 'leftJoin'den daha çok SQL semantiğine daha yakındır, fakat' leftJoin', 'outerJoin' için hala farklıdır: eğer sağ taraf tetikler ve bir birleştirme ortağı bulamazsa, kayıt düşer ve sonuç çıkmaz. . –

cevap

5

Her türlü katılımı kullanmak sorunu çözmeyecektir, çünkü her zaman eksik bir sonuçla sonuçlanacaktır (bazı akış duraklarında içeriye katılma) veya null ile "çoğaltır" (sol katılma veya dış katılma her iki akış da çevrimiçi durumdadır). Kafka Streams'de birleştirme semantiği ile ilgili ayrıntılar için bkz. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics.

Böylece, sana process(), transform() veya transformValues()KStream kullanılarak DSL ile-ve-maç karıştırabilirsiniz İşlemci API kullanmayı öneriyoruz. Daha fazla bilgi için bkz. How to filter keys and value with a Processor using Kafka Stream DSL.

Ayrıca çift filtreli hataya dayanıklı hale getirmek için işlemcinize özel bir depo da ekleyebilirsiniz (How to add a custom StateStore to the Kafka Streams DSL processor?).