2015-08-08 20 views
19

içinde csv dosyasına nasıl yazılır? Sonuçta elde ettiğim RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions).Ortaya çıkan RDD, Spark python

[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....] 

İstediğim bir CSV bir labels için sütuna (yukarıdaki çıktıda başlığın ilk parçası) ve predictions diğeri (tanımlama grubu çıkışının ikinci bölümü) dosyasını oluşturmaktır: Bu, bu formatta çıkışı . Ama Spark'deki Python'u kullanarak bir CSV dosyasına nasıl yazacağımı bilmiyorum.

Yukarıdaki çıktı ile nasıl bir CSV dosyası oluşturabilirim?

cevap

30

dizeleri içine RDD (labelsAndPredictions) Sadece map hatları (CSV hat) ardından rdd.saveAsTextFile() kullanın.

def toCSVLine(data): 
    return ','.join(str(d) for d in data) 

lines = labelsAndPredictions.map(toCSVLine) 
lines.saveAsTextFile('hdfs://my-node:9000/tmp/labels-and-predictions.csv') 
+0

Dizeler nasıl eşlenir? "ToLine (data)" içine ne yazacağım? –

+0

Üzgünüm, Python'u bildiğini sanıyordum. Cevabı ekledim. –

+0

Sadece bir şüphe, bu yüzden bu csv' dosyasını kaydedecek? Kodun bulunduğu dizinde? Diğer dizine kaydedebilir miyim ('saveAsTextFile '('/home/files/label-and-predictions.csv ')' kullanarak)? –

6

Bu alanların virgül içeriyorsa, bunların düzgün alıntı olmayacak çünkü sadece virgülle katılmak için iyi değil, mesela ','.join(['a', 'b', '1,2,3', 'c']), a,b,"1,2,3",c'u ne zaman isterseniz a,b,1,2,3,c verir. csv modülü yalnızca nesneleri dosyaya yazar beri boş "Dosya" ile oluşturmak zorunda,

# python 3 
import csv, io 

def list_to_csv_str(x): 
    """Given a list of strings, returns a properly-csv-formatted string.""" 
    output = io.StringIO("") 
    csv.writer(output).writerow(x) 
    return output.getvalue().strip() # remove extra newline 

# ... do stuff with your rdd ... 
rdd = rdd.map(list_to_csv_str) 
rdd.saveAsTextFile("output_directory") 

: Bunun yerine, bir şekilde biçimlendirilmiş bir CSV dizeye RDD her listesini dönüştürmek için Python'un csv modülü kullanmalıdır io.StringIO("") ve csv.writer 'e csv formatlı dizeyi yazmasını söyle. Daha sonra, "dosyaya" yazdığımız dizgiyi almak için output.getvalue() kullanıyoruz. Bu kodun Python 2 ile çalışmasını sağlamak için io'yu StringIO modülüyle değiştirin.

Eğer Spark DataFrames API'sini kullanıyorsanız, ayrıca csv formatına sahip DataBricks save function'a da bakabilirsiniz.

+0

Bu kodu kullanarak bir TypeError alıyorum. TypeError: Metin akışına str yazamaz. –

+0

@Moe Chughtai Spark/Python hangi sürümünü kullanıyorsunuz? Hangi satır size tür hatası verir ve hangi girdide? –

10

Bunun eski bir yazı olduğunu biliyorum. Ama burada PySpark tek CSV dosyasına iki sütunlu RDD yazmak nasıl 1.6.2

RDD var, aynı ararken birine yardım etmek:

>>> rdd.take(5) 
[(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')] 

Şimdi kod:

# First I convert the RDD to dataframe 
from pyspark import SparkContext 
df = sqlContext.createDataFrame(rdd, ['count', 'word']) 

DF:

>>> df.show() 
+-----+-----------+ 
|count|  word| 
+-----+-----------+ 
|73342|  cells| 
|62861|  cell| 
|61714| studies| 
|61377|  aim| 
|60168| clinical| 
|59275|   2| 
|59221|   1| 
|58274|  data| 
|58087|development| 
|56579|  cancer| 
|50243| disease| 
|49817| provided| 
|49216| specific| 
|48857|  health| 
|48536|  study| 
|47827| project| 
|45573|description| 
|45455| applicant| 
|44739| program| 
|44522| patients| 
+-----+-----------+ 
only showing top 20 rows 

Şimdi CSV

yazma
# Write CSV (I have HDFS storage) 
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out') 

P.S: Ben sadece Stackoverflow'taki yazılardan öğrenen yeni başlayan biriyim. Yani bunun en iyi yol olup olmadığını bilmiyorum. Ama benim için çalıştı ve umarım birilerine yardım eder!

+0

Bu benim için çalışan çözüm. Şerefe! – Indra

+0

Bazı sütunları/JSON kodlamak zorunda kaldım, ancak aksi halde çalışır –