2016-03-31 30 views

cevap

7

Sorunu, daha önce gördüğünüz tüm kelimeleri saklayarak çözebilirsiniz. Bu bilgiye sahip olmak, tüm yinelenen kelimeleri filtreleyebilirsiniz. Gerisi daha sonra 1 paralellikli bir harita operatörü tarafından sayılabilir. Aşağıdaki kod parçacığı tam olarak bunu yapar.

val env = StreamExecutionEnvironment.getExecutionEnvironment 

val inputStream = env.fromElements("foo", "bar", "foobar", "bar", "barfoo", "foobar", "foo", "fo") 

// filter words out which we have already seen 
val uniqueWords = inputStream.keyBy(x => x).filterWithState{ 
    (word, seenWordsState: Option[Set[String]]) => seenWordsState match { 
    case None => (true, Some(HashSet(word))) 
    case Some(seenWords) => (!seenWords.contains(word), Some(seenWords + word)) 
    } 
} 

// count the number of incoming (first seen) words 
val numberUniqueWords = uniqueWords.keyBy(x => 0).mapWithState{ 
    (word, counterState: Option[Int]) => 
    counterState match { 
     case None => (1, Some(1)) 
     case Some(counter) => (counter + 1, Some(counter + 1)) 
    } 
}.setParallelism(1) 

numberUniqueWords.print(); 

env.execute() 
+0

Gelen akış "sonsuz" ise ve dizge kümesi (filterWithState 'içinde) çok büyük olduğunda, OOM veya performans düşüşüne neden olabilir mi? –

+1

Çekirdek dışında kalan bir durum arka uç kullanıyorsanız. 'RocksDBStateBackend', bir devlet arka planıdır. Bellek durumu arka ucunu kullanırsanız, bir süre sonra durumu temizlemeniz gerekir, aksi halde OOM'a girebilirsiniz. –

+0

Yine de bir soru, bu durumda RocksDBStateBackend'in arka ucundaki kaydetme/geri yükleme işlemlerini anladığım gibi, karmaşıklık O (N), burada N kümedeki elemanların sayılmasıdır, yani bu arka uç her zaman Set öğesinin tüm öğelerini kaydeder/geri yükler. değiştirilmiş elemanlar? –