3用API JAVA实现Spark SQL中的LEFT-JOIN



我有来自3个表的3个数据集:

Dataset<TABLE1> bbdd_one = map.get("TABLE1").as(Encoders.bean(TABLE1.class)).alias("TABLE1");
Dataset<TABLE2> bbdd_two = map.get("TABLE2").as(Encoders.bean(TABLE2.class)).alias("TABLE2");
Dataset<TABLE3> bbdd_three = map.get("TABLE3").as(Encoders.bean(TABLE3.class)).alias("TABLE3");

我想在上面做一个左三重连接,并将其写入输出。拼花

sqlJOIN语句类似于以下内容:

SELECT one.field, ........, two.field ....., three.field, ... four.field
FROM TABLE1 one
LEFT JOIN TABLE2 two ON two.field = one.field
LEFT JOIN TABLE3 three ON three.field = one.field AND three.field = one.field
LEFT JOIN TABLE3 four ON four.field = one.field AND four.field = one.otherfield
WHERE one.field = 'whatever'

如何使用JAVA API实现这一点?有可能吗?我做了一个只有一个join的例子,但有3个似乎很难。

PS:我与JAVA API的另一个合作是:

Dataset<TJOINED> ds_joined = ds_table1
.join(ds_table2,
JavaConversions.asScalaBuffer(Arrays.asList("fieldInCommon1", "fieldInCommon2", "fieldInCommon3", "fieldInCommon4"))
.seq(),
"inner")
.select("a lot of fields", ... "more fields")                                                               
.as(Encoders.bean(TJOINED.class));

谢谢!

您尝试过链接联接语句吗?我不经常用Java编写代码,所以这只是一个猜测

Dataset<TJOINED> ds_joined = ds_table1
.join(
ds_table2,
JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
"left"
)
.join(
ds_table3,
JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
"left"
)
.join(
ds_table4,
JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
"left"
)
.select(...)
.as(Encoders.bean(TJOINED.class))

更新:如果我的理解是正确的,ds_table3ds_table4是相同的,它们在不同的字段上连接。也许这个更新后的答案,是在Scala中给出的,因为这是我习惯使用的,可能会实现你想要的。以下是完整的工作示例:

import spark.implicits._
case class TABLE1(f1: Int, f2: Int, f3: Int, f4: Int, f5:Int)
case class TABLE2(f1: Int, f2: Int, vTable2: Int)
case class TABLE3(f3: Int, f4: Int, vTable3: Int)
val one = spark.createDataset[TABLE1](Seq(TABLE1(1,2,3,4,5), TABLE1(1,3,4,5,6)))
//one.show()
//+---+---+---+---+---+
//| f1| f2| f3| f4| f5|
//+---+---+---+---+---+
//|  1|  2|  3|  4|  5|
//|  1|  3|  4|  5|  6|
//+---+---+---+---+---+
val two = spark.createDataset[TABLE2](Seq(TABLE2(1,2,20)))
//two.show()
//+---+---+-------+
//| f1| f2|vTable2|
//+---+---+-------+
//|  1|  2|     20|
//+---+---+-------+
val three = spark.createDataset[TABLE3](Seq(TABLE3(3,4,20), TABLE3(3,5,50)))
//three.show()
//+---+---+-------+
//| f3| f4|vTable3|
//+---+---+-------+
//|  3|  4|     20|
//|  3|  5|     50|
//+---+---+-------+
val result = one
.join(two, Seq("f1", "f2"), "left")
.join(three, Seq("f3", "f4"), "left")
.join(
three.withColumnRenamed("f4", "f5").withColumnRenamed("vTable3", "vTable4"),
Seq("f3", "f5"),
"left"
)
//result.show()
//+---+---+---+---+---+-------+-------+-------+
//| f3| f5| f4| f1| f2|vTable2|vTable3|vTable4|
//+---+---+---+---+---+-------+-------+-------+
//|  3|  5|  4|  1|  2|     20|     20|     50|
//|  4|  6|  5|  1|  3|   null|   null|   null|
//+---+---+---+---+---+-------+-------+-------+

最新更新