2016-08-05 41 views
6

reduce gibi bir performans istiyorum ancak operatörün değişmeli olmasına gerek yok. Yani, result takip edilmesinin her zaman "123456789" olmasını istiyorum.RDD'de herhangi bir işlem emri saklıyor mu?

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[24] at parallelize at <console>:24 

scala> val result = rdd.someAction{ _+_ } 

Öncelikle, fold bulduk. RDD#fold doc söyler:

def (zeroValue: T) kat kullanılarak, bütün bölümler için elemanları, her bölüm, ve daha sonra sonuç, T Agrega (op: (T, T) T ⇒) bir verilen birleştirici fonksiyonu ve nötr "sıfır değer" doc gerekmez değişmeli olduğu

not edin. Ancak, sonuç beklenmediği:

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)) 
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[48] at parallelize at <console>:24 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res22: String = 341276895 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res23: String = 914856273 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res24: String = 742539618 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res25: String = 271468359 
+0

Belgelerin bir sonraki bölümünü, ne gördüğünüzü açıklayan özlediniz: * "Bu, Scala gibi işlevsel dillerdeki dağıtılmamış koleksiyonlar için uygulanan katlama işlemlerinden biraz farklıdır. Bu katlama işlemi Bölümleri ayrı ayrı bölümlere ayırın ve daha sonra, belirli bir düzende sırayla her bir öğeye katlamayı uygulamak yerine bu sonuçları kesin sonuca katlayın .. Değişmez olmayan işlevler için, sonuç dağıtılmamış bir koleksiyona uygulanan bir katlamadan farklı olabilir. "* –

cevap

2

yoktur, bu orijinal RDD bir tek bölüm kaynaştırma düşünün göstermek için hiçbir Scala bu kriterleri karşılayan eylemi azaltmada yerleşik, ancak kolayca mapPartitions, collect ve yerel azalmalara birleştirerek kendi uygulayabilirsiniz :

import scala.reflect.ClassTag 

def orderedFold[T : ClassTag](rdd: RDD[T])(zero: T)(f: (T, T) => T): T = { 
    rdd.mapPartitions(iter => Iterator(iter.foldLeft(zero)(f))).collect.reduce(f) 
} 

fold tarafından kullanılan birleştirme yerine asenkron ve sırasız yöntem için collect ve reduce kombinasyonunu kullanarak, genel düzeni muhafaza edilmesini temin eder. Tabii

Bu gibi bazı ek maliyet ile gelir:

  • biraz daha yüksek bellek ayak izi sürücüsünde.
  • Önemli derecede daha yüksek gecikme süresi - yerel azaltmaya başlamadan önce tüm görevlerin bitmesini bekledik.
+0

Yardımlarınız için teşekkürler, bu her bölümün ** her zaman tüm RDD'nin sürekli bir alt dizisi ** olduğu anlamına mı geliyor? Bahsedilen bir belge var mı? – Eastsun

+0

Dokümanlarla ilgili olarak - bildiğim kadarıyla. Yine de, bazı sıralı yöntemlerin model ve sözleşmeleri tarafından az ya da çok kısıtlanmaktadır. Spark'deki asıl sorun genel dizinin nasıl belirleneceğidir. Genel olarak, a) siparişi ile ilgili bir gerekçeniz olduğunda iki durum vardır: a) açık sıralama (sözleşme ile) b) kullandığınızda, belirleyici sıralı bölünmeler üreten girdiniz olduğunda ve giriş ile mevcut nokta arasında karışmalar ve diğer veri hareketleri olmadığında. – zero323

1

olarak sipariş korumaz @YuvalItzchakov fold tarafından işaret:

scala> rdd.fold(""){ _+_ } 
res10: String = 312456879 

DÜZENLEME @ dk14 tarafından belirtildiği gibi hiçbir şans ile denedim sonuçları birleştirirken bölümlenmiş bir RDD.

scala> val rdd = sc.parallelize(1 to 9 map (_.toString)).coalesce(1) 
rdd: org.apache.spark.rdd.RDD[String] = CoalescedRDD[27] at coalesce at <console>:27 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res4: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res5: String = 123456789 

scala> rdd.zipWithIndex.map(x => (x._2, x._1)).sortByKey().map(_._2).fold(""){ _+_ } 
res6: String = 123456789 
+0

Bunu yapmanın, hesaplamanın paralellik yeteneklerini tamamen kaybetme dezavantajına maruz kalacağı unutulmamalıdır. –

+0

@YuvalItzchakov kesin; 'fold' ile, sipariş bölümlenmiş bir "RDD" içinde korunmayabilir. – elm

+0

Evet, anlıyorum. Ancak OP bunun farkında olmalıydı. –