2016-10-16 67 views

cevap

5

Bu mümkün.

RDD'de saveAsObjectFile ve saveAsTextFile işlevleriniz vardır. Tupller (value1, value2) olarak saklanır, böylece daha sonra ayrıştırabilirsiniz.

Okuma Yani ()

ortadan kaldırmak için textFile SparkContext gelen fonksiyonu ve daha sonra .map ile yapılabilir : Versiyon 1:

rdd.saveAsTextFile ("hdfs:///test1/"); 
// later, in other program 
val newRdds = sparkContext.textFile("hdfs:///test1/part-*").map (x => { 
    // here remove() and parse long/strings 
}) 

Versiyon 2:

rdd.saveAsObjectFile ("hdfs:///test1/"); 
// later, in other program - watch, you have tuples out of the box :) 
val newRdds = sparkContext.sc.sequenceFile("hdfs:///test1/part-*", classOf[Long], classOf[String]) 
+0

Sözü oluşturmak için bir örnektir. Ancak saveAsText birçok farklı dosya yapacağı için textFile'ı nasıl okuyoruz. – pythonic

+0

@pythonic Güncellememi görün - dosya aralığını okuyabilirsiniz. RDD'nin her bir parçası 'part-XYZŹŻ' dosyasına kaydedilir, bu yüzden bu ismin her bir dosyasını okuyabiliriz –

3

ben tavsiye ederim RDD'niz tablo biçiminde ise DataFrame'i kullanın. Bir veri çerçevesi, her bir sütunun bir değişken üzerinde ölçümler içerdiği ve her bir satırın bir vaka içerdiği bir tablo veya iki boyutlu dizi benzeri bir yapıdır. a DataFrame'in, Tabular formatı nedeniyle ek meta verileri vardır; bu, Spark'un sonlandırılmış sorguda belirli optimizasyonları çalıştırmasına izin verir. Burada bir RDD, daha fazla bir kara kutu veya optimize edilemeyen veri çekirdek çekirdeği olan Esnek Dağıtılmış Veri Kümesidir. Ancak, DataFrame'den bir RDD'ye veya tam tersine gidebilir ve bir RDD'den DataFrame'e (RDD bir tabular biçiminde ise), DF yöntemiyle gidebilirsiniz.

aşağıdaki

, bu :) düzgün bir çözüm/deposu HDF'ler CSV ve Parke biçiminde bir DataFrame,

val conf = { 
    new SparkConf() 
    .setAppName("Spark-HDFS-Read-Write") 
} 

val sqlContext = new SQLContext(sc) 

val sc = new SparkContext(conf) 

val hdfs = "hdfs:///" 
val df = Seq((1, "Name1")).toDF("id", "name") 

// Writing file in CSV format 
df.write.format("com.databricks.spark.csv").mode("overwrite").save(hdfs + "user/hdfs/employee/details.csv") 

// Writing file in PARQUET format 
df.write.format("parquet").mode("overwrite").save(hdfs + "user/hdfs/employee/details") 

// Reading CSV files from HDFS 
val dfIncsv = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(hdfs + "user/hdfs/employee/details.csv") 

// Reading PQRQUET files from HDFS 
val dfInParquet = sqlContext.read.parquet(hdfs + "user/hdfs/employee/details")