2016-02-11 21 views
5

Başlamak için 2.30.4 scala kullanıyorum ve yukarıdaki örnek Spark 1,6'da çalıştırılıyor (Spark'un bununla bir alakası olduğundan şüphe etmeme rağmen, sadece bir serileştirme sorunu).Serialization ile Scala yansıması (Spark üzerinden) - Sunulabilir olmayan semboller

İşte benim sorunum var: iki sınıf B1 ve B2 tarafından uygulanan bir özelliğim var Base varsayalım. Şimdi, sınıfların bir koleksiyonu tarafından genişletilen genel bir özelliği var, bunlardan biri Base türbinin alt türleri üzerinde. (Burada Renault Pilot Geliştirme Spark'ın nosyonunu tutmak, ancak tefrika olarak da başka aslında en kısa sürede bir şey olabilir; Bir şey aslında ne olursa olsun sadece bir sonuçtur): Bir alacağını

trait Foo[T] { def function(rdd: RDD[T]): Something } 
class Foo1[B <: Base] extends Foo[B] { def function(rdd: RDD[B]): Something = ... } 
class Foo2 extends Foo[A] { def function(rdd: RDD[A]): Something = ... } 
... 

Şimdi bir nesneye ihtiyacınız RDD[T] (burada hiçbir sakıncalılık olmadığını varsayalım, sadece basitleştirilmiş bir versiyon) Something türünü döndürerek T tipine karşılık gelen işlevin sonucuna karşılık gelir. Ancak, birleştirme stratejisi ile Array[T] için de çalışmalıdır. Ne yazık ki

object Obj { 
    def compute[T: TypeTag](input: RDD[T]): Something = { 
     typeOf[T] match { 
     case t if t <:< typeOf[A] => 
      val foo = new Foo[T] 
      foo.function(rdd) 
     case t if t <:< typeOf[Array[A]] => 
      val foo = new Foo[A] 
      foo.function(rdd.map(x => mergeArray(x.asInstance[Array[A]]))) 
     case t if t <:< typeOf[Base] => 
      val foo = new Foo[T] 
      foo.function(rdd) 
     // here it gets ugly... 
     case t if t <:< typeOf[Array[_]] => // doesn't fall through with Array[Base]... why? 
      val tt = getSubInfo[T](0) 
      val tpe = tt.tpe 
      val foo = new Foo[tpe.type] 
      foo.function(rdd.map(x => (x._1, mergeArray(x._2.asInstanceOf[Array[tpe.type]])) 
     } 
    } 

    // strategy to transform arrays of T into a T object when possible 
    private def mergeArray[T: TypeTag](a: Array[T]): T = ... 

    // extract the subtype, e.g. if Array[Int] then at position 0 extracts a type tag for Int, I can provide the code but not fondamental for the comprehension of the problem though 
    private def getSubInfo[T: TypeTag](i: Int): TypeTag[_] = ... 
} 

, yerel bir makinede çok iyi çalışıyor görünüyor, ancak (tefrika) Kıvılcım gönderilen aldığında, ben bir org.apache.spark.SparkException: Task not serializable ile olsun:

Caused by: java.io.NotSerializableException: scala.reflect.internal.Symbols$PackageClassSymbol 
Serialization stack: 
    - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package types) 
    - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol) 

I Şimdiye kadar benzediğini bir geçici çözüm (oldukça açık, sayısız olasılıklar) var, ama benim merakım için, bunu düzeltmek için bir yol var mı? Ve neden Semboller serileştirilebilir değil, Manifest'lerde eşdeğerleri ise?

Yardımlarınız için teşekkürler.

cevap

1

TypeTag'lar genellikle artık scala'da serileştirilebilirler, ancak, doğrudan, türler değillerdir (bu durum, tiptags olmayan semboller içerdiğinden gariptir: - /).

Bu

// implicit constructor TypeTag parameter is serialized. 
abstract class TypeAware[T:TypeTag] extends Serializable { 
    def typ:Type = _typeCached 

    @transient 
    lazy val _typeCached:Type = typeOf[T] 
} 

trait Foo[T] extends Serializable { 
    def function(rdd: RDD[T]): Something {... impl here?...} 
    def typ:Type 
} 

class Concrete[T:TypeTag] extends TypeAware[T] with Foo[T] with Serializable{ 
    def function(rdd: RDD[T]): Something {... impl here?...} 
} 
+0

istediğini yapabilir ben TypeApi aynı zamanda – tribbloid

+0

Kesinlikle scala.reflect.internal.Symbols $ PackageClassSymbol yüzden bu işe yaramaz içeren çalışır inanıyoruz. (Scala 2.11'de) https://github.com/scala/scala/pull/3817 adresinde birleştirme isteği vardı – user48956