2017-12-15 210 views
6

Java ile yazılmış ve Spark 2.1 kullanan bir Spark akış uygulamasına sahibim. Kafka'nın mesajlarını okumak için KafkaUtils.createDirectStream kullanıyorum. Kafka mesajları için kryo encoder/decoder kullanıyorum. Bunu Kafka özelliklerinde belirttim-> key.deserializer, value.deserializer, key.serializer, value.deserializer

Spark, mesajları bir mikro yığında çektiğinde, kryo dekoderi kullanılarak mesajlar başarıyla çözülür. Ancak, Spark uygulayıcısının kafka'dan okunan her mesajın kodunu çözmek için yeni bir kryo kod çözücü örneği oluşturduğunu fark ettim. Kod çözücü kurucusunun içindeki günlükleri

içine koyarak kontrol ettim. Bu bana garip geliyor. Her mesaj ve her parti için aynı kod çözücü örneği kullanılmamalıdır mı?Kafka Direct Stream her mesaj için neden yeni bir kod çözücü yaratıyor?

Kod ben kafka dan okuyorum burada:

biz Kıvılcım içten Kafka veri getirir nasıl görmek istiyorsanız
JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams)); 

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> { 
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value()); 
}); 

cevap

3

, her RDD için uygulanan bir yöntem olan KafkaRDD.compute bakmak gerekir hangi

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { 
    val part = thePart.asInstanceOf[KafkaRDDPartition] 
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) 
    if (part.fromOffset == part.untilOffset) { 
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + 
    s"skipping ${part.topic} ${part.partition}") 
    Iterator.empty 
    } else { 
    new KafkaRDDIterator(part, context) 
    } 
} 

burada önemli bir KafkaRDDIterator yaratır else fıkra vardır: iyi, RDD olduğunu hesaplamak için nasıl bir çerçeve söyler. Bu içten vardır: Gördüğünüz gibi her yatan bölüm için, anahtar dekoderi ve yansıma yoluyla değer dekoderi hem bir örneğini oluşturur

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[K]] 

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[V]] 

. Bu, no'lu mesaj başına , Kafka bölüm için no'lu olarak üretilmediği anlamına gelir.

Neden bu şekilde uygulanır? Bilmiyorum. Bir anahtar ve değer kod çözücüsünün Spark içinde gerçekleşen tüm diğer tahsislere kıyasla negatif bir performans vuruşu olması gerektiğinden dolayı varsayıyorum.

Uygulamanızı profil oluşturduysanız ve bunu bir dağıtım yolu olarak bulduysanız, bir sorunla karşılaşabilirsiniz. Aksi takdirde, bunun için endişelenmeyeceğim.

+0

Çok iyi araştırılmış! #impressed –

+0

@Yuval: Kafka 0.10.x kullanıyorum. Spark, önbellek anahtarının tüketici kimliği, konu kimliği, bölüm kimliği ile tanımlandığı önbelleğe alınan kafka tüketicilerini (yürütücü başına) kullanır. Kafka bölmesi başına bir kod çözücüye sahip olmak ya da Spark'in mesajları paralel olarak nasıl çözmesi mantıklıdır. Beklediğim şey, önbelleğe alınmış bir tüketici içindeki bölüm başına bir kez yeni bir kod çözücü oluşturulmalı ve bu kadar! Bu problemi hafif bir yük altında görmüyorum ama sadece saniyeler içinde 1000 mesaj ilettiğimde. Muhtemelen bir "GC" döngüsüne koşuyorum. KafkaRDD sınıfında günlüğün nasıl etkinleştirileceğine dair bir fikrin var mı? – scorpio

+0

@scorpio Kafka 0.10.x, bir kod çözücü gerektirmez. Altta yatan 'ConsumerRecord' değerini döndürür ve bununla ne yapacağınızı seçersiniz. Belki bir 'harita' içinde bir kod çözücü örneği oluşturuyor musunuz? –