10
Flink Akış ile bir akıştaki benzersiz kelimelerin sayısını saymanın bir yolu var mı? Sonuçlar, artmaya devam eden bir sayı akışı olacaktır.Akışta benzersiz kelimeler nasıl sayılır?
Flink Akış ile bir akıştaki benzersiz kelimelerin sayısını saymanın bir yolu var mı? Sonuçlar, artmaya devam eden bir sayı akışı olacaktır.Akışta benzersiz kelimeler nasıl sayılır?
Sorunu, daha önce gördüğünüz tüm kelimeleri saklayarak çözebilirsiniz. Bu bilgiye sahip olmak, tüm yinelenen kelimeleri filtreleyebilirsiniz. Gerisi daha sonra 1
paralellikli bir harita operatörü tarafından sayılabilir. Aşağıdaki kod parçacığı tam olarak bunu yapar.
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.fromElements("foo", "bar", "foobar", "bar", "barfoo", "foobar", "foo", "fo")
// filter words out which we have already seen
val uniqueWords = inputStream.keyBy(x => x).filterWithState{
(word, seenWordsState: Option[Set[String]]) => seenWordsState match {
case None => (true, Some(HashSet(word)))
case Some(seenWords) => (!seenWords.contains(word), Some(seenWords + word))
}
}
// count the number of incoming (first seen) words
val numberUniqueWords = uniqueWords.keyBy(x => 0).mapWithState{
(word, counterState: Option[Int]) =>
counterState match {
case None => (1, Some(1))
case Some(counter) => (counter + 1, Some(counter + 1))
}
}.setParallelism(1)
numberUniqueWords.print();
env.execute()
Gelen akış "sonsuz" ise ve dizge kümesi (filterWithState 'içinde) çok büyük olduğunda, OOM veya performans düşüşüne neden olabilir mi? –
Çekirdek dışında kalan bir durum arka uç kullanıyorsanız. 'RocksDBStateBackend', bir devlet arka planıdır. Bellek durumu arka ucunu kullanırsanız, bir süre sonra durumu temizlemeniz gerekir, aksi halde OOM'a girebilirsiniz. –
Yine de bir soru, bu durumda RocksDBStateBackend'in arka ucundaki kaydetme/geri yükleme işlemlerini anladığım gibi, karmaşıklık O (N), burada N kümedeki elemanların sayılmasıdır, yani bu arka uç her zaman Set öğesinin tüm öğelerini kaydeder/geri yükler. değiştirilmiş elemanlar? –