2017-03-08 55 views
5

olarak değişiyor UD'ye kıvılcımdaki bir yapıyı aktarmaya çalışıyorum. Alan adlarını değiştiriyor ve sütun pozisyonuna yeniden ad veriyor. Nasıl düzeltebilirim?Spark Yapı yapısal adları UDF

object TestCSV { 

      def main(args: Array[String]) { 

      val conf = new SparkConf().setAppName("localTest").setMaster("local") 
      val sc = new SparkContext(conf) 
      val sqlContext = new SQLContext(sc) 


      val inputData = sqlContext.read.format("com.databricks.spark.csv") 
        .option("delimiter","|") 
        .option("header", "true") 
        .load("test.csv") 


      inputData.printSchema() 

      inputData.show() 

      val groupedData = inputData.withColumn("name",struct(inputData("firstname"),inputData("lastname"))) 

      val udfApply = groupedData.withColumn("newName",processName(groupedData("name"))) 

      udfApply.show() 
      } 



      def processName = udf((input:Row) =>{ 

       println(input) 
       println(input.schema) 

       Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname")) 

       }) 

     } 

Çıktı:

root 
|-- id: string (nullable = true) 
|-- firstname: string (nullable = true) 
|-- lastname: string (nullable = true) 

+---+---------+--------+ 
| id|firstname|lastname| 
+---+---------+--------+ 
| 1|  jack| reacher| 
| 2|  john|  Doe| 
+---+---------+--------+ 

Hata:

[jack,reacher] StructType(StructField(i[1],StringType,true), > StructField(i[2],StringType,true)) 17/03/08 09:45:35 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.lang.IllegalArgumentException: Field "firstname" does not exist.

+0

Neden iki dizeyi ("Dizeler" gibi) doğrudan udf'ye aktarmıyorsunuz? –

+0

Bu mümkündür, ancak Spark UDF'de 10'dan fazla alanı argüman olarak geçemezsiniz. Burada verdiğim, basitleştirilmiş bir kullanım vakası. Bazen UDF'de 20'den fazla sütun geçirmem gerekiyor. Bunu nasıl başarabilirim? – hp2326

cevap

1

Ne karşılaşmış gerçekten garip. Biraz oynadıktan sonra en sonunda optimizasyon motoru ile ilgili bir sorun olabileceğini düşündüm. Sorun, UDF değil, struct işlevi gibi görünüyor.

ben senin bildirilen özel durum olsun önbelleğe alma olmadan, ben groupedDatacache zaman (Spark 1.6.3) işe:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.hive.HiveContext 
import org.apache.spark.{SparkConf, SparkContext} 


object Demo { 

    def main(args: Array[String]): Unit = { 

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[1]")) 
    val sqlContext = new HiveContext(sc) 
    import sqlContext.implicits._ 
    import org.apache.spark.sql.functions._ 


    def processName = udf((input: Row) => { 
     Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname")) 
    }) 


    val inputData = 
     sc.parallelize(
     Seq(("1", "Kevin", "Costner")) 
    ).toDF("id", "firstname", "lastname") 


    val groupedData = inputData.withColumn("name", struct(inputData("firstname"), inputData("lastname"))) 
     .cache() // does not work without cache 

    val udfApply = groupedData.withColumn("newName", processName(groupedData("name"))) 
    udfApply.show() 
    } 
} 

Alternatif olarak, web yapı yapmak için RDD API kullanabilirsiniz, ancak bu gerçekten güzel değil:

case class Name(firstname:String,lastname:String) // define outside main 

val groupedData = inputData.rdd 
    .map{r => 
     (r.getAs[String]("id"), 
      Name(
      r.getAs[String]("firstname"), 
      r.getAs[String]("lastname") 
     ) 
     ) 
    } 
    .toDF("id","name") 
+0

Teşekkürler @Raphael Roth. Bu şimdilik benim için çalıştı. Bu cevabı kabul edeceğim. – hp2326