2016-11-30 31 views
10

Spark 1.6 ile birlikte CDH'dir. CSV'yi zaman damgası ve tarih türlerine sahip bir Spark Dataframe'e okuma

Bir apache Kıvılcım DataFrame içine bu Varsayımsal CSV içe çalışıyorum:

$ hadoop fs -cat test.csv 
a,b,c,2016-09-09,a,2016-11-11 09:09:09.0,a 
a,b,c,2016-09-10,a,2016-11-11 09:09:10.0,a 

Ben databricks-CSV kavanoz kullanın.

val textData = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "false") 
    .option("delimiter", ",") 
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss") 
    .option("inferSchema", "true") 
    .option("nullValue", "null") 
    .load("test.csv") 

Sonuç Dataframe için şema yapmak için inferSchema kullanıyorum. printSchema() işlevi, yukarıda kodunu bana aşağıdaki çıktıyı verir:

scala> textData.printSchema() 
root 
|-- C0: string (nullable = true) 
|-- C1: string (nullable = true) 
|-- C2: string (nullable = true) 
|-- C3: string (nullable = true) 
|-- C4: string (nullable = true) 
|-- C5: timestamp (nullable = true) 
|-- C6: string (nullable = true) 

scala> textData.show() 
+---+---+---+----------+---+--------------------+---+ 
| C0| C1| C2|  C3| C4|     C5| C6| 
+---+---+---+----------+---+--------------------+---+ 
| a| b| c|2016-09-09| a|2016-11-11 09:09:...| a| 
| a| b| c|2016-09-10| a|2016-11-11 09:09:...| a| 
+---+---+---+----------+---+--------------------+---+ 

C3 kolon Yaylı türü vardır. C3'ün tarih türüne sahip olmasını istiyorum. Tarih tipine ulaşmak için aşağıdaki kodu denedim. mm:

val textData = sqlContext.read.format("com.databricks.spark.csv") 
    .option("header", "false") 
    .option("delimiter", ",") 
    .option("dateFormat", "yyyy-MM-dd") 
    .option("inferSchema", "true") 
    .option("nullValue", "null") 
    .load("test.csv") 

scala> textData.printSchema 
root 
|-- C0: string (nullable = true) 
|-- C1: string (nullable = true) 
|-- C2: string (nullable = true) 
|-- C3: timestamp (nullable = true) 
|-- C4: string (nullable = true) 
|-- C5: timestamp (nullable = true) 
|-- C6: string (nullable = true) 

scala> textData.show() 
+---+---+---+--------------------+---+--------------------+---+ 
| C0| C1| C2|     C3| C4|     C5| C6| 
+---+---+---+--------------------+---+--------------------+---+ 
| a| b| c|2016-09-09 00:00:...| a|2016-11-11 00:00:...| a| 
| a| b| c|2016-09-10 00:00:...| a|2016-11-11 00:00:...| a| 
+---+---+---+--------------------+---+--------------------+---+ 

bu kodu ve ilk blok arasındaki tek fark, DateFormat seçenek hattıdır (yerine arasında "yyyy-AA-gg SS "yyyy-AA-gg" kullanımı ss ") .Şimdi C3 ve C5'i zaman damgası olarak aldım (C3 hala tarih değil). Ancak C5 için, HH :: mm: ss kısmı göz ardı edilir ve verilerde sıfırlar olarak görünür.

İdeal olarak C3'ün tip tarihi olmasını, C5'in tip zaman damgası olmasını ve HH: mm: ss bölümünün yok sayılmamasını istiyorum. Benim çözümüm şu an böyle görünüyor. Veriyi DB'mden paralel olarak çekerek csv'yi yapıyorum. Tüm tarihleri ​​zaman damgaları olarak aldığımdan emin olun (İdeal değil). Yani, deney CSV şimdi şuna benzer:

$ hadoop fs -cat new-test.csv 
a,b,c,2016-09-09 00:00:00,a,2016-11-11 09:09:09.0,a 
a,b,c,2016-09-10 00:00:00,a,2016-11-11 09:09:10.0,a 

Bu benim son çalışma kodudur: Burada

val textData = sqlContext.read.format("com.databricks.spark.csv") 
    .option("header", "false") 
    .option("delimiter", ",") 
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss") 
    .schema(finalSchema) 
    .option("nullValue", "null") 
    .load("new-test.csv") 

, ben tam zaman damgası biçimini kullanın ("yyyy-AA-gg SS: mm : dateFormat içinde ss "). Ben c3 tarih ve C5 Zaman damgası türü (Spark sql türleri) olduğu finalSchema örneğini el ile oluşturuyorum. Bu şemada schema() işlevini kullanıyorum. aşağıdaki gibi çıktı şöyledir:

scala> finalSchema 
res4: org.apache.spark.sql.types.StructType = StructType(StructField(C0,StringType,true), StructField(C1,StringType,true), StructField(C2,StringType,true), StructField(C3,DateType,true), StructField(C4,StringType,true), StructField(C5,TimestampType,true), StructField(C6,StringType,true)) 

scala> textData.printSchema() 
root 
|-- C0: string (nullable = true) 
|-- C1: string (nullable = true) 
|-- C2: string (nullable = true) 
|-- C3: date (nullable = true) 
|-- C4: string (nullable = true) 
|-- C5: timestamp (nullable = true) 
|-- C6: string (nullable = true) 


scala> textData.show() 
+---+---+---+----------+---+--------------------+---+ 
| C0| C1| C2|  C3| C4|     C5| C6| 
+---+---+---+----------+---+--------------------+---+ 
| a| b| c|2016-09-09| a|2016-11-11 09:09:...| a| 
| a| b| c|2016-09-10| a|2016-11-11 09:09:...| a| 
+---+---+---+----------+---+--------------------+---+ 

var mı daha kolay bir veya bir csv dosyası (yani bir kıvılcım dataframe içine hem tarih ve zaman damgası türü vardır

Önemli linkler ayrıştırmasını kutu dışına? : muhtemelen beklenen sonucu getirmeyecektir önemsiz olmayan durumlar için anlaması seçenekle https://github.com/databricks/spark-csv

cevap

1


http://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options
Eğergörebileceğiniz gibi.:

if (field == null || field.isEmpty || field == nullValue) { 
    typeSoFar 
} else { 
    typeSoFar match { 
    case NullType => tryParseInteger(field) 
    case IntegerType => tryParseInteger(field) 
    case LongType => tryParseLong(field) 
    case DoubleType => tryParseDouble(field) 
    case TimestampType => tryParseTimestamp(field) 
    case BooleanType => tryParseBoolean(field) 
    case StringType => StringType 
    case other: DataType => 
     throw new UnsupportedOperationException(s"Unexpected data type $other") 

Sadece bir zaman damgası türü değil, bir tarih türü ile her sütunu maç çalışır, bu nedenle bu durum için "kutu çözeltisinden" mümkün değildir.Ama benim deneyimim ile "daha kolay" çözüm, şemayı doğrudan needed type ile tanımlamakta, sadece RDD ile eşleşen bir türün tüm verinin değerlendirilmediği bir tür kümesi ayarını önleyecektir. Son şemanız verimli bir çözümdür.