2017-09-26 98 views
7

EMR'de kaynak yöneticisi olarak ve 2 düğümde YARN ile bir kıvılcım çalıştırıyorum. Koşulum yerine getirilmediyse, adımı bilerek başarısız olmam gerekiyor, bu yüzden sonraki adım yapılandırmaya göre yürütülmüyor. Bunu başarmak için dynamoDB'ye bir günlük mesajı ekledikten sonra özel bir istisna atarım.Kıvılcım, SparkException EMR içinde atanırken yanlış davranış

İyi çalışıyor ama Dinamo'daki kayıt iki kez ekleniyor.

Kodum aşağıda.

if(<condition>) { 
    <method call to insert in dynamo> 
    throw new SparkException(<msg>); 
    return; 
} 

Satır istisnası yapmak için satırı kaldırırsam, iyi çalışır, ancak adım tamamlanır.

Günlük iletisini iki kez almadan adımı nasıl başarısız yapabilirim.

Yardımlarınız için teşekkürler. Hata koşulu vurmak ve iki farklı uygulayıcıları tarafından işlenen çünkü

Selamlar, Sorabh

cevap

2

Muhtemelen dinamo mesajı takıldı sebebi iki kere oldu. Kıvılcım işçiler arasında yapılacak işi bölüyor ve işçiler hiçbir bilgiyi paylaşmıyor.

Spark adımı FAIL olana sahip olmak için gereksiniminizi sürdürmekte olduğunuzdan emin değilim, ancak bunun yerine, doğrudan kıvılcım ölmek yerine uygulama kodunuzda bu hata durumunu izlemenizi öneririm. Diğer bir deyişle, hatayı algılayan ve bunu kıvılcım sürücünüze geri aktaran kod yazınız, daha sonra uygun şekilde hareket ediniz.

Bunu yapmanın bir yolu, verilerinizi işlerken meydana gelen hataları saymak için bir toplayıcı kullanmak olabilir. Kabaca böyle bir şey (Ben scala ve DataFrames farz ediyorum, ama gerektiği gibi RDD en ve/veya piton uyum sağlayabilir) görünecektir:

val accum = sc.longAccumulator("Error Counter") 
def doProcessing(a: String, b: String): String = { 
    if(condition) { 
    accum.add(1) 
    null 
    } 
    else { 
    doComputation(a, b) 
    } 
} 
val doProcessingUdf = udf(doProcessing _) 

df = df.withColumn("result", doProcessing($"a", $"b")) 

df.write.format(..).save(..) // Accumulator value not computed until an action occurs! 

if(accum.value > 0) { 
    // An error detected during computation! Do whatever needs to be done. 
    <insert dynamo message here> 
} 

Eğer geribildirim arıyorsanız bu yaklaşım hakkında güzel bir şey olduğunu Kıvılcım UI'sinde, çalışırken akümülatör değerlerini görebileceksiniz. Referans için, burada akümülatörler ile ilgili belgeler: http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators