2016-04-11 9 views
0

Ben iki tane DataFrame varsa ve date, time, mid, binImbalance alanlara göre onlara katılmak ve listelerde timeB ve midB karşılık gelen değerleri toplamak istiyorum.Coll_list nasıl kullanılır?

val d1: DataFrame 
val d3: DataFrame 
val d2 = d3 
    .withColumnRenamed("date", "dateC") 
    .withColumnRenamed("milliSec", "milliSecC") 
    .withColumnRenamed("mid", "midC") 
    .withColumnRenamed("time", "timeC") 
    .withColumnRenamed("binImbalance", "binImbalanceC") 

    d1.join(d2, d1("date") === d2("dateC") and 
       d1("time") === d2("timeC") and 
       d1("mid") === d2("midC") 
     ) 
    .groupBy("date", "time", "mid", "binImbalance") 
    .agg(collect_list("timeB"),collect_list("midB")) 

Ama hatayı olsun bu işe yaramazsa:

Aşağıdaki kod ile denedim : Reference 'timeB' is ambiguous, could be: timeB#16, timeB#35. Aynı zamanda, timeB sütunundan birini yeniden adlandırdıysam, değerleri bir listede toplayamazdım.

bir örnek sonucu olmalıdır:

+-----+---------+------+------------+---------+------+ 
| date|  time| mid|binImbalance| timeB| midB| 
+-----+---------+------+------------+---------+------+ 
| 1 |  1 | 10 |   1| 4 | 10 |   
| 2 |  2 | 20 |   2| 5 | 11 |    
| 3 |  3 | 30 |   3| 6 | 12 |    


+-----+---------+------+------------+---------+------+ 
| date|  time| mid|binImbalance| timeB| midB| 
+-----+---------+------+------------+---------+------+ 
| 1 |  1 | 10 |   1| 7 | 13 |   
| 2 |  2 | 20 |   2| 8 | 14 |    
| 3 |  3 | 30 |   3| 9 | 15 | 

SONUÇ:

+-----+---------+------+------------+---------+-----------+ 
| date|  time| mid|binImbalance| ListTime| ListMid | 
+-----+---------+------+------------+---------+-----------+ 
| 1 |  1 | 10 |   1| [4,7] | [10,13] |   
| 2 |  2 | 20 |   2| [5,8] | [11,14] |    
| 3 |  3 | 30 |   3| [6,9] | [12,15] | 

, Minimal Komple ve Doğrulanabilir örnek

d1   d2 
id data  id data  
-- ----  -- ---- 
1 1  1 2 
2 4  2 5 
3 6  3 3 

Result 
id list 
-- ---- 
1 [1,2] 
2 [4,5] 
3 [6,3] 
+0

Sorunuza 'd1.printSchema' ve 'd3.printSchema' ekleyebilir misiniz? –

cevap

0

asgari örnek üzerinde Çözüm:

import org.apache.spark.sql.functions.udf 

val aggregateDataFrames = udf((x: Double, y: Double) => Seq(x,y)) 

val d3 = d2.withColumnRenamed("id","id3") 
      .withColumnRenamed("data","data3") 

val joined = d1.join(d3, d1("id") === d3("id3")) 


val result = joined 
       .withColumn("list", aggregateDataFrames(joined("data"),joined("data3"))) 
       .select("id","list") 
+0

▪ val aggregateDataFrames: (Çift, İkili) => Seq [Çift] 'i kullanmak ve türleri 'udf' içinde bırakmak isteyebilirsiniz. Ayrıca, daha fazla tuşa basmayı engelleyen sütunlara (veya '$' (dolar işareti)) erişmek için '' '(tek bir kesme işareti) kullanın (ve aslında daha iyi olmasını isteyebilirsiniz). –