Kendi bölümleyicimi uygulayıp orijinal rdd'yi karıştırmaya çalıştığımda bir sorunla karşılaşıyorum. Buorg.apache.spark.SparkException: Görev dizileştirilemiyor, o
ekledikten sonra, Serializable olmayan işlevleri başvurarak yol açmasına karşın, biliyorum her Alakalı sınıfa
Serializable uzanır, bu sorun hala var. Ne yapmalıyım? İplik "ana" org.apache.spark.SparkException içinde
İstisna: org.apache.spark at : org.apache.spark.util.ClosureCleaner $ .ensureSerializable (166 ClosureCleaner.scala) de seri hale getirilebilir değil Görev org.apache.spark.SparkContext.clean de (SparkContext.scala: 1622): .util.ClosureCleaner $ .clean (158 ClosureCleaner.scala)
object STRPartitioner extends Serializable{
def apply(expectedParNum: Int,
sampleRate: Double,
originRdd: RDD[Vertex]): Unit= {
val bound = computeBound(originRdd)
val rdd = originRdd.mapPartitions(
iter => iter.map(row => {
val cp = row
(cp.coordinate, cp.copy())
}
)
)
val partitioner = new STRPartitioner(expectedParNum, sampleRate, bound, rdd)
val shuffled = new ShuffledRDD[Coordinate, Vertex, Vertex](rdd, partitioner)
shuffled.setSerializer(new KryoSerializer(new SparkConf(false)))
val result = shuffled.collect()
}
class STRPartitioner(expectedParNum: Int,
sampleRate: Double,
bound: MBR,
rdd: RDD[_ <: Product2[Coordinate, Vertex]])
extends Partitioner with Serializable {
...
}