içinde UserDefinedAggregateFunction (UDAF) 'da otomatik olarak değişmez hale gelir Spark içindeki bir UserDefinedAggregateFunction (UDAF) tanımlamaya çalışıyorum; bu, bir grup sütunundaki her bir benzersiz değer için gerçekleşen olayların sayısını sayar. Niçin Değişken harita kıvılcım
Bu
bir örnek:+----+----+
|col1|col2|
+----+----+
| a| a1|
| a| a1|
| a| a2|
| b| b1|
| b| b2|
| b| b3|
| b| b1|
| b| b1|
+----+----+
Ben
val func = new DistinctValues
Sonra
df dataframe uygulamak bir UDAF DistinctValues olacak, ben dataframe böyledf
olduğunu varsayalım
val agg_value = df.groupBy("col1").agg(func(col("col2")).as("DV"))
Böyle bir şey olmasını bekliyorum e bu:
+----+--------------------------+
|col1|DV |
+----+--------------------------+
| a| Map(a1->2, a2->1) |
| b| Map(b1->3, b2->1, b3->1)|
+----+--------------------------+
yüzden, böyle bir UDAF çıktı
Sonra
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.MapType
import org.apache.spark.sql.types.LongType
import Array._
class DistinctValues extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil)
def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil)
def dataType: DataType = MapType(StringType, LongType)
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = scala.collection.mutable.Map()
}
def update(buffer: MutableAggregationBuffer, input: Row) : Unit = {
val str = input.getAs[String](0)
var mp = buffer.getAs[scala.collection.mutable.Map[String, Long]](0)
var c:Long = mp.getOrElse(str, 0)
c = c + 1
mp.put(str, c)
buffer(0) = mp
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = {
var mp1 = buffer1.getAs[scala.collection.mutable.Map[String, Long]](0)
var mp2 = buffer2.getAs[scala.collection.mutable.Map[String, Long]](0)
mp2 foreach {
case (k ,v) => {
var c:Long = mp1.getOrElse(k, 0)
c = c + v
mp1.put(k ,c)
}
}
buffer1(0) = mp1
}
def evaluate(buffer: Row): Any = {
buffer.getAs[scala.collection.mutable.Map[String, LongType]](0)
}
}
benim dataframe bu fonksiyona sahiptir
Böyle hata verdival func = new DistinctValues
val agg_values = df.groupBy("col1").agg(func(col("col2")).as("DV"))
,
func: DistinctValues = [email protected]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 32.0 failed 4 times, most recent failure: Lost task 1.3 in stage 32.0 (TID 884, ip-172-31-22-166.ec2.internal): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map
at $iwC$$iwC$DistinctValues.update(<console>:39)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.update(udaf.scala:431)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:187)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:180)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:116)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
update(buffer: MutableAggregationBuffer, input: Row)
yöntem, değişken buffer
bir immutable.Map
olduğunu yorgun programı, mutable.Map
için
dökmeyi Ama initialize(buffer: MutableAggregationBuffer, input:Row)
yönteminde buffer
değişkeni başlatmak için mutable.Map
kullandı. update
yöntemine aynı değişken iletildi mi? Ayrıca buffer
mutableAggregationBuffer
, bu nedenle, değişebilir olmalıdır, değil mi?
Neden benim mutable.Map değiştirilemedi? Ne oldu bilen var mı?
Görevi tamamlamak için bu işlevin gerçekten değişebilir bir Haritasına ihtiyacım var. Değişmez haritadan bir değişken harita oluşturmak için bir geçici çözüm olduğunu biliyorum, ardından güncelleyin. Ama gerçekten, programın değişebilir olanının neden otomatik olarak programda değişmez olduğunu bilmek istiyorum, bu bana mantıklı gelmiyor.
İyi tutuş! Hmm, 'Yapısal Tür 'içindeki MapyType' durumda olabilir. Ama kendimi tanımlamam sürece, spark.sql.types'de başka bir değişken harita türü yok. –
Dediğim gibi yapma - sadece değişmez bir 'Map' kullan. 'mp = mp + (k -> c)' değişmez bir 'Harita' üzerinde size' mp ile aynı işlevselliği verir.(k, c) 'yi bir' Harita ' –
'mp = mp + (k -> c)' üzerine yerleştir Scala'da yeniyim, bunun gibi değişmez bir veri türünü değiştirebileceğinizi bilmiyordum. Çok teşekkür ederim! –