2015-07-02 17 views
7

Yerel bir klasörden veri aldığım ve her dönüşümü gerçekleştirdiğim klasöre yeni bir dosya bulduğum kıvılcım 1.2.0 ile kıvılcım akışı ortamım var. DStream verilere benim analizi gerçekleştirmek amacıylaBir kıvılcım akışı içeriğinde HDFS'ye bir RDD yazın

val ssc = new StreamingContext(sc, Seconds(10)) 
val data = ssc.textFileStream(directory) 

Ben

var arr = new ArrayBuffer[String](); 
    data.foreachRDD { 
    arr ++= _.collect() 
} 

Sonra istediğim bilgiyi ayıklamak ve HDF'ler üzerine onları kurtarmak için elde edilen verileri kullanmak bir Array haline dönüştürmek zorundayız.

val myRDD = sc.parallelize(arr) 
myRDD.saveAsTextFile("hdfs directory....") 

Gerçekten (ince çalışacak olan) DStream.saveAsTextFiles("...") ile HDF'ler verileri kaydetmek imkansız bir Array ile verileri işlemek için gerektiğinden ve RDD kurtarmak zorunda ama bu preocedure ile nihayet adında boş çıktı dosyaları var bölüm-00000 vb ...

Bir arr.foreach(println) ile Transofmasyonların doğru sonuçlarını görebiliyorum.

Şüphelim, bu kıvılcım, daha önce yazılanları silerek, aynı dosyada veri yazmak için her parti üzerinde çalışır. myRDD.saveAsTextFile("folder" + System.currentTimeMillis().toString()) gibi bir dinamik adlandırılmış klasöre kaydetmeye çalıştım, ancak her zaman yalnızca bir kat oluşturulur ve çıktı dosyaları hala boş.

Bir kıvılcım akışı bağlamında HDFS'ye nasıl bir RDD yazabilirim?

+0

Ben sorun varış bütün işçilerin mevcut olmamasıdır tahmin: Kıvılcım 1.5+ dataframes API bu özellik vardır. Arrınızı yayınlamaya çalışıp sonunda HDF'ye yazmayı denediniz mi? –

+0

Çünkü bir klasörü izlemem gerekiyor ve yüklenen tüm yeni dosyaları kesip akan seslere iyi bir çözüm gibi geliyor. Tek bir makine değil, 2 makine grubu. Şimdi sadece dosyaları metin olarak yazıyorum, ancak gelecekte parke dosyaları yazmak zorunda kalacağım ve Spark – drstein

+0

ile oldukça basit bir şekilde deneyebilecek misiniz? var arr = new ArrayBuffer [String](); val yayınlanan = sc.broadcast (arr) data.foreachRDD { yayınlanan ++ = _.collect() } val myRDD = sc.parallelize (yayınlanan) myRDD.saveAsTextFile ("HDF'ler dizini ...." –

cevap

5

Spark Streaming'i, tasarlanmadığı bir şekilde kullanıyorsunuz. Kullanım durumunuz için Spark'i kullanarak ya da kodunuzu uyarlamanızı öneririz. Diziyi sürücüye toplamak, dağıtılmış bir motoru kullanmanın amacını ortadan kaldırır ve uygulamanızı tek makineli olarak etkin hale getirir (iki makine aynı zamanda verileri tek bir makinede işlemekten çok daha fazla çalışmaya neden olur).

Bir dizi ile yapabileceğiniz her şey Spark ile yapabilirsiniz. Bu yüzden, iş akışlarınızı, işyerinde dağıtılan akış içinde çalıştırın ve çıktınızı DStream.saveAsTextFiles() kullanarak yazın. Tek bir dosyaya yazmak için foreachRDD + saveAsParquet(path, overwrite = true)'u kullanabilirsiniz.

+0

kullanmak zorunda sanırım Teşekkürler, ben kesinlikle senin noktası olsun, DStream kullanmak için trasform mantığını değiştirmeye çalışacağım. Her dosyada kıvılcım akışı için kayıtların aynı dosyada kaydedilip kaydedilmediğini biliyor musunuz? Şu an her parti aralığındaki yeni dosyalarla yeni bir klasör buluyorum. – drstein

+1

Evet, foreachRDD + saveAsParquet ile üzerine yazmak için bir seçenek var. –

+0

@MariusSoutier, bu konuda bana yardım edebilir misiniz? Http: // stackoverflow.com/questions/39363586/issue-while-data-from-spark-streaming-to-cassanadra' – Naresh

2

@vzamboni:

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path);