2015-05-21 8 views
6

Spark Scala API'sini kullanıyorum.Spark SQL DataFrame'i flatMap ile nasıl kullanılır?

root 
|-- ids: array (nullable = true) 
| |-- element: map (containsNull = true) 
| | |-- key: integer 
| | |-- value: string (valueContainsNull = true) 
|-- match: array (nullable = true) 
| |-- element: integer (containsNull = true) 

Esasen 2 sütun [kimlikleri: Liste [Harita [Int, dize]], maç: Liste [Uluslararası]] Aşağıdaki şema ile Spark, SQL DataFrame (bir Avro dosyadan okuma) var. Ne yapmak istiyorum 3 sütun [id, özelliğini, maç] üretmek için her satır flatMap() olduğunu

[List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)] 
[List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)] 
... 

: benziyor Örnek veri. veri girişi olarak yukarıdaki 2 satır kullanarak biz alacağı:

[1,a,0] 
[2,b,0] 
[3,c,1] 
[4,d,0] 
[5,c,1] 
[6,a,0] 
[7,e,1] 
[8,d,0] 
... 

ve sonra groupByStringmülkiyet (örn: a, b, ...) üretmek için count("property") ve sum("match"):

a 2 0 
b 1 0 
c 2 2 
d 2 0 
e 1 1 

ben böyle bir şey yapmak ister ki:

val result = myDataFrame.select("ids","match").flatMap( 
    (row: Row) => row.getList[Map[Int,String]](1).toArray()) 
result.groupBy("property").agg(Map(
    "property" -> "count", 
    "match" -> "sum")) 

sorunolmasıdırDataFrame'i RDD'ye dönüştürür. DataFrames kullanarak flatMap tipi işlem yapmanın ve groupBy yapmanın iyi bir yolu var mı?

cevap

8

flatMap ne yapar? Her giriş satırını 0 veya daha fazla satıra dönüştürür. Bunları filtreleyebilir veya yenilerini ekleyebilir. SQL'de aynı işlevselliği elde etmek için join kullanın. join ile ne yapmak istiyorsan onu yapabilir misin?

Alternatif olarak, aynı zamanda join sadece belirli tür olan Dataframe.explode bakabilir (kolayca oluşturabileceği kendi explode bir UDF bir DataFrame katılarak). explode, giriş olarak tek bir sütunu alır ve bölmenizi veya birden çok değere dönüştürmenizi sağlar ve ardından join orijinal satırı yeniden yeni satırlara geri döndürür. Yani:

user  groups 
griffin mkt,it,admin 

haline gelebilir:

user  group 
griffin mkt 
griffin it 
griffin admin 

Yani DataFrame.explode bakmak demek ve kolayca oraya almazsa, UDF ile birleşir çalışacaktı.

+0

Cevabınız için teşekkür ederiz! DataFrame.explode yöntemi tam olarak aradığım şey. –

0

Benim SQL biraz paslı, ama bir satır nesneleri listesi üretmek için flatMap bir seçenek ve daha sonra elde edilen RDD bir DataFrame dönüştürebilirsiniz.