2016-06-16 16 views
12

Verilen iki Spark Veri Setleri, A ve B ı aşağıdaki gibi tek bir sütun üzerinde katılmak yapabilirsiniz:Veri kümelerine çoklu sütunlarda nasıl katılır?

a.joinWith(b, $"a.col" === $"b.col", "left") 

sorum Birden çok sütun kullanarak katılmak yapabilirsiniz olup olmadığıdır. Esasen şu DataFrames api kod eşdeğeri: Dataframe olduğu gibi

a.join(b, a("col") === b("col") && a("col2") === b("col2"), "left") 

cevap

12

Yapabilirsin aynı şekilde:

Spark < 2.0.0 yılında
val xs = Seq(("a", "foo", 2.0), ("x", "bar", -1.0)).toDS 
val ys = Seq(("a", "foo", 2.0), ("y", "bar", 1.0)).toDS 

xs.joinWith(ys, xs("_1") === ys("_1") && xs("_2") === ys("_2"), "left").show 
// +------------+-----------+ 
// |   _1|   _2| 
// +------------+-----------+ 
// | [a,foo,2.0]|[a,foo,2.0]| 
// |[x,bar,-1.0]|  null| 
// +------------+-----------+ 

böyle bir şey kullanabilirsiniz :

xs.as("xs").joinWith(
    ys.as("ys"), ($"xs._1" === $"ys._1") && ($"xs._2" === $"ys._2"), "left") 
7

birbiri ardına where tane zincirleme tarafından katılma başka yolu yok. Önce, where operatör (ler) ardından (isteğe bağlı olarak kendi türünü ve) katılmak belirtmek yani

scala> case class A(id: Long, name: String) 
defined class A 

scala> case class B(id: Long, name: String) 
defined class B 

scala> val as = Seq(A(0, "zero"), A(1, "one")).toDS 
as: org.apache.spark.sql.Dataset[A] = [id: bigint, name: string] 

scala> val bs = Seq(B(0, "zero"), B(1, "jeden")).toDS 
bs: org.apache.spark.sql.Dataset[B] = [id: bigint, name: string] 

scala> as.join(bs).where(as("id") === bs("id")).show 
+---+----+---+-----+ 
| id|name| id| name| 
+---+----+---+-----+ 
| 0|zero| 0| zero| 
| 1| one| 1|jeden| 
+---+----+---+-----+ 


scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).show 
+---+----+---+----+ 
| id|name| id|name| 
+---+----+---+----+ 
| 0|zero| 0|zero| 
+---+----+---+----+ 

böyle bir goodie nedeni Kıvılcım iyileştirici birine (hayır cinas tasarlamak) ardışık where s katılacak olmasıdır join ile. Temel mantıksal ve fiziksel planları görmek için explain operatörünü kullanın.

scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).explain(extended = true) 
== Parsed Logical Plan == 
Filter (name#31 = name#36) 
+- Filter (id#30L = id#35L) 
    +- Join Inner 
     :- LocalRelation [id#30L, name#31] 
     +- LocalRelation [id#35L, name#36] 

== Analyzed Logical Plan == 
id: bigint, name: string, id: bigint, name: string 
Filter (name#31 = name#36) 
+- Filter (id#30L = id#35L) 
    +- Join Inner 
     :- LocalRelation [id#30L, name#31] 
     +- LocalRelation [id#35L, name#36] 

== Optimized Logical Plan == 
Join Inner, ((name#31 = name#36) && (id#30L = id#35L)) 
:- Filter isnotnull(name#31) 
: +- LocalRelation [id#30L, name#31] 
+- Filter isnotnull(name#36) 
    +- LocalRelation [id#35L, name#36] 

== Physical Plan == 
*BroadcastHashJoin [name#31, id#30L], [name#36, id#35L], Inner, BuildRight 
:- *Filter isnotnull(name#31) 
: +- LocalTableScan [id#30L, name#31] 
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false], input[0, bigint, false])) 
    +- *Filter isnotnull(name#36) 
     +- LocalTableScan [id#35L, name#36]