2016-04-14 16 views
5

içinde UserDefinedAggregateFunction (UDAF) 'da otomatik olarak değişmez hale gelir Spark içindeki bir UserDefinedAggregateFunction (UDAF) tanımlamaya çalışıyorum; bu, bir grup sütunundaki her bir benzersiz değer için gerçekleşen olayların sayısını sayar. Niçin Değişken harita kıvılcım

Bu

bir örnek:
+----+----+ 
|col1|col2| 
+----+----+ 
| a| a1| 
| a| a1| 
| a| a2| 
| b| b1| 
| b| b2| 
| b| b3| 
| b| b1| 
| b| b1| 
+----+----+ 

Ben

val func = new DistinctValues 

Sonra

df dataframe uygulamak bir UDAF DistinctValues ​​olacak, ben dataframe böyle df olduğunu varsayalım
val agg_value = df.groupBy("col1").agg(func(col("col2")).as("DV")) 

Böyle bir şey olmasını bekliyorum e bu:

+----+--------------------------+ 
|col1|DV      | 
+----+--------------------------+ 
| a| Map(a1->2, a2->1)  | 
| b| Map(b1->3, b2->1, b3->1)| 
+----+--------------------------+ 

yüzden, böyle bir UDAF çıktı

Sonra
import org.apache.spark.sql.expressions.MutableAggregationBuffer 
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.types.StructField 
import org.apache.spark.sql.types.DataType 
import org.apache.spark.sql.types.ArrayType 
import org.apache.spark.sql.types.StringType 
import org.apache.spark.sql.types.MapType 
import org.apache.spark.sql.types.LongType 
import Array._ 

class DistinctValues extends UserDefinedAggregateFunction { 
    def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil) 

    def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil) 

    def dataType: DataType = MapType(StringType, LongType) 
    def deterministic: Boolean = true 

    def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = scala.collection.mutable.Map() 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) : Unit = { 
    val str = input.getAs[String](0) 
    var mp = buffer.getAs[scala.collection.mutable.Map[String, Long]](0) 
    var c:Long = mp.getOrElse(str, 0) 
    c = c + 1 
    mp.put(str, c) 
    buffer(0) = mp 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = { 
    var mp1 = buffer1.getAs[scala.collection.mutable.Map[String, Long]](0) 
    var mp2 = buffer2.getAs[scala.collection.mutable.Map[String, Long]](0) 
    mp2 foreach { 
     case (k ,v) => { 
      var c:Long = mp1.getOrElse(k, 0) 
      c = c + v 
      mp1.put(k ,c) 
     } 
    } 
    buffer1(0) = mp1 
    } 

    def evaluate(buffer: Row): Any = { 
     buffer.getAs[scala.collection.mutable.Map[String, LongType]](0) 
    } 
} 

benim dataframe bu fonksiyona sahiptir

Böyle hata verdi
val func = new DistinctValues 
val agg_values = df.groupBy("col1").agg(func(col("col2")).as("DV")) 

,

func: DistinctValues = [email protected] 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 32.0 failed 4 times, most recent failure: Lost task 1.3 in stage 32.0 (TID 884, ip-172-31-22-166.ec2.internal): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map 
at $iwC$$iwC$DistinctValues.update(<console>:39) 
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.update(udaf.scala:431) 
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:187) 
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:180) 
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:116) 
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152) 
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

update(buffer: MutableAggregationBuffer, input: Row) yöntem, değişken buffer bir immutable.Map olduğunu yorgun programı, mutable.Map için

dökmeyi Ama initialize(buffer: MutableAggregationBuffer, input:Row) yönteminde buffer değişkeni başlatmak için mutable.Map kullandı. update yöntemine aynı değişken iletildi mi? Ayrıca buffermutableAggregationBuffer, bu nedenle, değişebilir olmalıdır, değil mi?

Neden benim mutable.Map değiştirilemedi? Ne oldu bilen var mı?

Görevi tamamlamak için bu işlevin gerçekten değişebilir bir Haritasına ihtiyacım var. Değişmez haritadan bir değişken harita oluşturmak için bir geçici çözüm olduğunu biliyorum, ardından güncelleyin. Ama gerçekten, programın değişebilir olanının neden otomatik olarak programda değişmez olduğunu bilmek istiyorum, bu bana mantıklı gelmiyor.

cevap

4

StructType ürününüzün MapType olduğuna inanmak. Bu nedenle, , değişmez olan bir Map tutar.

Bunu dönüştürebilir, ama neden sadece bu değişmez bırakın ve ne yapmadığını:

mp = mp + (k -> c) 

değişmez Map bir giriş eklenir? Aşağıda

Çalışma örneği:

class DistinctValues extends UserDefinedAggregateFunction { 
    def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("_2", IntegerType) :: Nil) 

    def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil) 

    def dataType: DataType = MapType(StringType, LongType) 
    def deterministic: Boolean = true 

    def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = Map() 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) : Unit = { 
    val str = input.getAs[String](0) 
    var mp = buffer.getAs[Map[String, Long]](0) 
    var c:Long = mp.getOrElse(str, 0) 
    c = c + 1 
    mp = mp + (str -> c) 
    buffer(0) = mp 
    } 

    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = { 
    var mp1 = buffer1.getAs[Map[String, Long]](0) 
    var mp2 = buffer2.getAs[Map[String, Long]](0) 
    mp2 foreach { 
     case (k ,v) => { 
      var c:Long = mp1.getOrElse(k, 0) 
      c = c + v 
      mp1 = mp1 + (k -> c) 
     } 
    } 
    buffer1(0) = mp1 
    } 

    def evaluate(buffer: Row): Any = { 
     buffer.getAs[Map[String, LongType]](0) 
    } 
} 
+0

İyi tutuş! Hmm, 'Yapısal Tür 'içindeki MapyType' durumda olabilir. Ama kendimi tanımlamam sürece, spark.sql.types'de başka bir değişken harita türü yok. –

+0

Dediğim gibi yapma - sadece değişmez bir 'Map' kullan. 'mp = mp + (k -> c)' değişmez bir 'Harita' üzerinde size' mp ile aynı işlevselliği verir.(k, c) 'yi bir' Harita ' –

+0

'mp = mp + (k -> c)' üzerine yerleştir Scala'da yeniyim, bunun gibi değişmez bir veri türünü değiştirebileceğinizi bilmiyordum. Çok teşekkür ederim! –