2016-02-18 15 views
6

Yinelemeli bir hesaplamanın sonuçlarını toplamak için bir RDD oluşturmak istiyorum.Yinelemeli bir hesaplamanın sonuçlarını toplamak için bir RDD oluşturma

import org.apache.spark.mllib.random.RandomRDDs._  

val n = 10 

val step1 = normalRDD(sc, n, seed = 1) 
val step2 = normalRDD(sc, n, seed = (step1.max).toLong) 
val result1 = step1.zip(step2) 
val step3 = normalRDD(sc, n, seed = (step2.max).toLong) 
val result2 = result1.zip(step3) 

... 

val step50 = normalRDD(sc, n, seed = (step49.max).toLong) 
val result49 = result48.zip(step50) 

(N, ara RDDs oluşturma ve sonunda birlikte daha sonra sıkıştırma aynı zamanda 50 RDDs uzun ok gibi olacaktır: Aşağıdaki kodunu değiştirmek için bir döngü (ya da herhangi bir alternatif) kullanabilir nasıl

tohum = saygı iteratif oluşturulur (adım (n-1) .max) durumu)

+0

bir oluşturmak için scalaz gelen 'Stream.unfold' kullanmayı tercih ediyorum adımların akışı ve ardından sıkıştır Kendisi ve/veya scanRight ile .. – Reactormonk

cevap

6

bir özyinelemeli işlevi çalışmak:

/** 
* The return type is an Option to handle the case of a user specifying 
* a non positive number of steps. 
*/ 
def createZippedNormal(sc : SparkContext, 
         numPartitions : Int, 
         numSteps : Int) : Option[RDD[Double]] = { 

    @scala.annotation.tailrec 
    def accum(sc : SparkContext, 
      numPartitions : Int, 
      numSteps : Int, 
      currRDD : RDD[Double], 
      seed : Long) : RDD[Double] = { 
    if(numSteps <= 0) currRDD 
    else { 
     val newRDD = normalRDD(sc, numPartitions, seed) 
     accum(sc, numPartitions, numSteps - 1, currRDD.zip(newRDD), newRDD.max) 
    } 
    } 

    if(numSteps <= 0) None 
    else Some(accum(sc, numPartitions, numSteps, sc.emptyRDD[Double], 1L)) 
} 
+0

Kuyruk özyineleme, RDD soyundan sizi koruyamayacaktır :) – zero323

+0

@ zero323 Kabul edildi. Bununla birlikte, bu sorun sorunun gerekleri ile içseldir. Herhangi bir cevap benzer bir sorun yaşayacaktır. –

+0

Sadece kuyruğun optimize edilmeyeceği sahnelerin arkasında özyinelemeli bir veri yapısı oluşturduğunuzu belirtmek istedim. Daha fazla bir şey :) Ve aslında bunu çözebilir ve kontrol noktalarını kullanarak problemden kaçabilirsiniz. Tek bir zip olmadan bile çözülebilir :) – zero323