2013-12-16 19 views
16

Spark'deyim, bir Avro dosyasından RDD sahibiyim. Artık o RDD bazı dönüşümler yapmak ve bir Avro dosyası olarak geri kaydetmek istiyorum:Spark: Avro dosyasına yazma

val job = new Job(new Configuration()) 
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema)) 

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2)) 
    .saveAsNewAPIHadoopFile(outputPath, 
    classOf[AvroKey[GenericRecord]], 
    classOf[org.apache.hadoop.io.NullWritable], 
    classOf[AvroKeyOutputFormat[GenericRecord]], 
    job.getConfiguration) 

bu Spark çalışan Şema $ recordSchema seri hale getirilebilir değil şikayet ediyor.

.map çağrısından rahatsızlık duyarsam (ve sadece rdd.saveAsNewAPIHadoopFile'ım varsa), çağrı başarılı olur.

Burada yanlış olan ne yapıyorum?

Herhangi bir fikrin var mı?

+0

İstisna yığını izlemesini sağlayabilir misiniz? Spark, Hadoop ve Avro versiyon numaraları da faydalı olabilir. – Wildfire

+0

Lütfen naifliğimi bağışla. Burada işin ne olduğunu sorabilir miyim? Bir haritayı küçültmek gibi görünüyor mu? Kıvılcımı yazmak için kıvılcım kullanırsak, neden bir haritayı neden azaltmalıyız? –

cevap

2

Spark tarafından kullanılan varsayılan seri hale getirici, Java serileştirme işlemidir. Bu yüzden tüm java türleri için Java serileştirme kullanılarak serileştirilmeye çalışılacaktır. AvroKey seri hale getirilemez, bu yüzden hatalar alıyorsunuz.

KryoSerializer'ı veya özel serileştirmenizde (Avro gibi) eklentiyi kullanabilirsiniz. Burada serileştirme hakkında daha fazla bilgi edinebilirsiniz. http://spark-project.org/docs/latest/tuning.html

Nesnenizi, dışa aktarılabilen bir şeyle de sarabilirsiniz. Örneğin, AvroFlumeEvent öğesini sararlayan SparkFlumeEvent öğesine bakın: https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

5

Buradaki sorun, İş'te kullanılan avro.Schema sınıfının diziselleştirilememesi ile ilgilidir. Şema nesnesini harita işlevindeki koddan göndermeye çalıştığınızda istisna atılır. Sadece şema yeni bir örneğini oluşturarak çalışmak için her şeyi yapabilir

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
... 
rdd.map(t => { 
    // reference to the schema object declared outside 
    val record = new GenericData.Record(schema) 
}) 

şu şekildedir: Yapmanız denerseniz

Örneğin, sen "Görevi seri hale getirilebilir değil" istisna alacak fonksiyon bloğunun içinde: Eğer idare her kayıt için avro şema ayrıştırma istemez yana

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.map(t => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 
    val record = new GenericData.Record(innserSchema) 
    ... 
}) 

, daha iyi bir çözüm bölüm düzeyinde şemayı ayrıştırmak olacaktır. aşağıdakiler de çalışır:

val schema = new Schema.Parser().parse(new File(jsonSchema)) 
// The schema above should not be used in closures, it's for other purposes 
... 
rdd.mapPartitions(tuples => { 
    // create a new Schema object 
    val innserSchema = new Schema.Parser().parse(new File(jsonSchema)) 

    tuples.map(t => { 
    val record = new GenericData.Record(innserSchema) 
    ... 
    // this closure will be bundled together with the outer one 
    // (no serialization issues) 
    }) 
}) 

kod yukarıdaki sürece haritası işlevi birden çok uzak uygulayıcıları tarafından yürütülecek gidiyor beri, jsonSchema dosyasına taşınabilir bir referans sağlamak gibi çalışır. Bu, HDFS'deki bir dosyaya referans olabilir veya JAR'daki uygulama ile birlikte paketlenebilir (içeriğini ikinci durumda almak için sınıf yükleyici işlevlerini kullanırsınız). Spark ile Avro kullanmaya çalışıyorsunuz olanlar için

, bazı çözülmemiş derleme sorunları hala var olduğunu fark ve Maven'in POM aşağıdaki ithalat kullanmak zorunda:

<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro-mapred</artifactId> 
    <version>1.7.7</version> 
    <classifier>hadoop2</classifier> 
<dependency> 

Not "hadoop2" sınıflandırıcı. Sorunu https://issues.apache.org/jira/browse/SPARK-3039 adresinden izleyebilirsiniz.

+0

Bu yöntem, harita fonksiyonumuzda harici bağımlılıklar olmadığında iyi çalışır. Şemalaştırılabilir hale getirmenin herhangi bir yolu var mı? – COSTA