Spark's DataFrames ile çalışırken, sütunlardaki verileri eşlemek için Kullanıcı Tanımlı İşlevler (UDF'ler) gerekir. UDF'ler, bağımsız değişken türlerinin açıkça belirtilmesini gerektirir. Benim durumumda, nesne dizilerinden oluşan bir sütunu manipüle etmem gerekiyor ve ne tür bir kullanacağımı bilmiyorum. İşte bir örnek:Bir Spark DataFrame'deki bir nesne dizisini kabul eden bir UDF tanımlamak?
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
Bu sütunların
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
verilere yerleşik
org.apache.spark.sql.functions
gerçekleştirmek için temel işlemleri kullanmak çok kolay ve keyfi işlemleri gerçekleştirmek için özel UDF'leri yazmak için genellikle kolaydır
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
Ama benim "denek" sütunundaki nesnelerin dizileri işlemek için bir UDF kullanmak isterseniz? UDF'deki argüman için ne tür kullanırım? Bunun yerine kıvılcım tarafından sağlanan birini kullanarak, boyut fonksiyonunu reimplement istiyorsanız Örneğin,:
val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
Açıkça Array[Something]
ne tür kullanmalıyım ... çalışmıyor !? Array[]
'u boşaltabilir miyim? Etrafında dolaşmak bana scala.collection.mutable.WrappedArray
'un onunla bir ilgisi olabilir, ama yine de sağlamanız gereken başka bir şey var.
Ben bu olsun: java.lang.UnsupportedOperationException: tip org.apache.spark.sql.Row için Şema org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection de desteklenmemektedir. scala: 733) org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection.scala: 671) adresinde org.apache.spark.sql.functions $ .udf (functions.scala: 3076) . .. 134 elekler –
@GuruprasadGV UDF 'struct'' için' Product' ('TupleN', case class') 'i döndürmelidir. – zero323