2015-05-20 19 views
6

Çalıştığım DataFrame'im var ve bir dizi sütunla gruplamak ve sütunların geri kalanında grup başına işlem yapmak istiyorum. DataFrame yılındaSpark DataFrame: Gruplarda çalıştır

rdd.map(tup => ((tup._1, tup._2, tup._3), tup)). 
    groupByKey(). 
    forEachPartition(iter => doSomeJob(iter)) 

.arazi böyle başlardım:

df.groupBy("col1", "col2", "col3") // Reference by name 

ama sonra ameliyat nasıl emin değilim düzenli RDD .arazi ben şöyle bir şey olmazdı düşünüyorum İşlemlerim GroupedData tarafından sunulan ortalama/dak/maks/sayımdan daha karmaşıksa, gruplar.

Örneğin, I, (grup ilişkili s yineleme ile) ("col1", "col2", "col3") grup başına tek bir MongoDB belge oluşturmak N bölümleri aşağı ölçek, daha sonra bir MongoDB veritabanına belgeler eklemek istiyorum. N limiti, istediğim maksimum eşzamanlı bağlantı sayısıdır.

Herhangi bir öneri?

+2

Best Way: Bir UDAF yaz (henüz desteklenmiyor, bkz. SPARK-4233 ve SPARK-3947). O zamana kadar, –

cevap

1

Kendiniz birleştirme yapabilirsiniz. İlk grupları olsun:

val groups = df.groupBy($"col1", $"col2", $"col3").agg($"col1", $"col2", $"col3") 

Sonra orijinal DataFrame bunu geri katılabilirsiniz: Bu size başlangıçta vardı tam olarak aynı veriyi alırken

val joinedDF = groups 
    .select($"col1" as "l_col1", $"col2" as "l_col2", $"col3" as "l_col3) 
    .join(df, $"col1" <=> $"l_col1" and $"col2" <=> $"l_col2" and $"col3" <=> $"l_col3") 

(ve 3 ek, gereksiz sütunlarla) Satırla ilişkili (col1, col2, col3) grubu için MongoDB belge kimliğine sahip bir sütun eklemek için başka bir birleştirme yapabilirsiniz.

Her halükarda, deneyimime katılır ve kendiliğinden bağlantılar DataFrames'te karmaşık şeylerle uğraşma biçiminizdir.

+0

oluşturmak istediğinizi elde etmek için aggregateByKey gibi RDD yöntemlerine erişmek için DF.RDD'yi kullanın. Oradan nasıl alacağımı bilmiyorum - tüm "$" col4'le yinelememe izin ver "' & '$ "col5" 'belirli bir kombinasyonla ilişkili değerler' ($ "col1", $ "col2", $ "col3") '? –

+0

DataFrames'ın çalışma şekli, yalnızca iki seçeneğiniz vardır. Ya Gruplar'da Dizi gibi bir şey yaparsınız, daha sonra bir foreach yaparsınız ve döngüler içinde cols1'e cols3'e filtre uygulayarak bir DataFrame yaratırsınız. Ya da hepsini tek bir DataFrame'de yapmak zorunda olduğum gibi karmaşık birleşimleri kullanarak yapmak zorundasınız. –