Aşağıda, 30 saniyenin bir pencere boyutu ve 10 saniyelik slayt boyutu üzerinde kelime sayımı elde etmek için basit bir kod bulunmaktadır.Spark Akış Penceresi Çalışması
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming.api._
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(5))
// read from text file
val lines0 = ssc.textFileStream("test")
val words0 = lines0.flatMap(_.split(" "))
// read from socket
val lines1 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words1 = lines1.flatMap(_.split(" "))
val words = words0.union(words1)
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
wordCounts.print()
ssc.checkpoint(".")
ssc.start()
ssc.awaitTermination()
Ancak, bu hattan hata alıyorum:
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
. Özellikle, _ + _
'dan. Hata, sorunun ne olduğunu söyleyebilir mi? Teşekkürler!
teşekkür ederiz açıklanmıştır! Değişimden sonra, program beklenen sonuçları veriyordu ama bu arada başka bir hata verdi: java.util.NoSuchElementException: anahtar bulunamadı: 140605186000000 ms \t scala.collection.MapLike $ class.default (MapLike.scala) : 228) scala.collection.AbstractMap.default de \t (Map.scala: 58) scala.collection.mutable.HashMap.apply (HashMap.scala de \t: org.apache.spark.streaming 64) \t. dstream.ReceiverInputDStream.getReceivedBlockInfo (ReceiverInputDStream.scala: 77) Acaba bu nasıl oldu? – user2895478
@ user2895478 Bu [Jira bileti] 'nin (https://issues.apache.org/jira/browse/SPARK-2009) sorununun 1.0.1 ve 1.1.0'da çözüldüğüne inanıyorum – aaronman