2016-08-17 31 views
13

Spark's DataFrames ile çalışırken, sütunlardaki verileri eşlemek için Kullanıcı Tanımlı İşlevler (UDF'ler) gerekir. UDF'ler, bağımsız değişken türlerinin açıkça belirtilmesini gerektirir. Benim durumumda, nesne dizilerinden oluşan bir sütunu manipüle etmem gerekiyor ve ne tür bir kullanacağımı bilmiyorum. İşte bir örnek:Bir Spark DataFrame'deki bir nesne dizisini kabul eden bir UDF tanımlamak?

import sqlContext.implicits._ 

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects 
val data = sqlContext.read.json(sc.parallelize(Seq(
    """ 
    |{ 
    | "topic" : "pets", 
    | "subjects" : [ 
    | {"type" : "cat", "score" : 10}, 
    | {"type" : "dog", "score" : 1} 
    | ] 
    |} 
    """))) 

Bu sütunların

import org.apache.spark.sql.functions.size 
data.select($"topic", size($"subjects")).show 

+-----+--------------+ 
|topic|size(subjects)| 
+-----+--------------+ 
| pets|    2| 
+-----+--------------+ 

verilere yerleşik org.apache.spark.sql.functions gerçekleştirmek için temel işlemleri kullanmak çok kolay ve keyfi işlemleri gerçekleştirmek için özel UDF'leri yazmak için genellikle kolaydır

import org.apache.spark.sql.functions.udf 
val enhance = udf { topic : String => topic.toUpperCase() } 
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+ 
|UDF(topic)|size(subjects)| 
+----------+--------------+ 
|  PETS|    2| 
+----------+--------------+ 

Ama benim "denek" sütunundaki nesnelerin dizileri işlemek için bir UDF kullanmak isterseniz? UDF'deki argüman için ne tür kullanırım? Bunun yerine kıvılcım tarafından sağlanan birini kullanarak, boyut fonksiyonunu reimplement istiyorsanız Örneğin,:

val my_size = udf { subjects: Array[Something] => subjects.size } 
data.select($"topic", my_size($"subjects")).show 

Açıkça Array[Something] ne tür kullanmalıyım ... çalışmıyor !? Array[]'u boşaltabilir miyim? Etrafında dolaşmak bana scala.collection.mutable.WrappedArray'un onunla bir ilgisi olabilir, ama yine de sağlamanız gereken başka bir şey var.

cevap

16

Ne aradığınız Seq[o.a.s.sql.Row] geçerli:

import org.apache.spark.sql.Row 

val my_size = udf { subjects: Seq[Row] => subjects.size } 

Açıklama: Zaten bildiğiniz gibi

    ArrayType ait
  • Güncel gösterimi WrappedArray yüzden Array bunu çalışmak ve olmayacak, olan güvenli tarafta kalmak daha iyidir.
  • StructType için yerel tür 'dur. Maalesef bu, bireysel alanlara erişim güvenli değildir anlamına gelir. Notlar

:

  • udf geçirilen struct fonksiyonunu oluşturmak için Product tipini (Tuple* veya case class) değil dönmek zorundadır.
+0

Ben bu olsun: java.lang.UnsupportedOperationException: tip org.apache.spark.sql.Row için Şema org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection de desteklenmemektedir. scala: 733) org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection.scala: 671) adresinde org.apache.spark.sql.functions $ .udf (functions.scala: 3076) . .. 134 elekler –

+0

@GuruprasadGV UDF 'struct'' için' Product' ('TupleN', case class') 'i döndürmelidir. – zero323