我有两个DataFrame
,希望根据date
、time
、mid
、binImbalance
字段将它们连接起来,并在列表中收集timeB
和midB
中的相应值。
我尝试了以下代码:
val d1: DataFrame
val d3: DataFrame
val d2 = d3
.withColumnRenamed("date", "dateC")
.withColumnRenamed("milliSec", "milliSecC")
.withColumnRenamed("mid", "midC")
.withColumnRenamed("time", "timeC")
.withColumnRenamed("binImbalance", "binImbalanceC")
d1.join(d2, d1("date") === d2("dateC") and
d1("time") === d2("timeC") and
d1("mid") === d2("midC")
)
.groupBy("date", "time", "mid", "binImbalance")
.agg(collect_list("timeB"),collect_list("midB"))
但这不起作用,因为我得到了错误:: Reference 'timeB' is ambiguous, could be: timeB#16, timeB#35
。同时,如果我重命名了其中一个timeB
列,我将无法收集列表中的值。
一个示例结果应该是:
+-----+---------+------+------------+---------+------+
| date| time| mid|binImbalance| timeB| midB|
+-----+---------+------+------------+---------+------+
| 1 | 1 | 10 | 1| 4 | 10 |
| 2 | 2 | 20 | 2| 5 | 11 |
| 3 | 3 | 30 | 3| 6 | 12 |
+-----+---------+------+------------+---------+------+
| date| time| mid|binImbalance| timeB| midB|
+-----+---------+------+------------+---------+------+
| 1 | 1 | 10 | 1| 7 | 13 |
| 2 | 2 | 20 | 2| 8 | 14 |
| 3 | 3 | 30 | 3| 9 | 15 |
结果:
+-----+---------+------+------------+---------+-----------+
| date| time| mid|binImbalance| ListTime| ListMid |
+-----+---------+------+------------+---------+-----------+
| 1 | 1 | 10 | 1| [4,7] | [10,13] |
| 2 | 2 | 20 | 2| [5,8] | [11,14] |
| 3 | 3 | 30 | 3| [6,9] | [12,15] |
最小、完整且可验证的示例
d1 d2
id data id data
-- ---- -- ----
1 1 1 2
2 4 2 5
3 6 3 3
Result
id list
-- ----
1 [1,2]
2 [4,5]
3 [6,3]
最小示例的解决方案:
import org.apache.spark.sql.functions.udf
val aggregateDataFrames = udf( (x: Double, y: Double) => Seq(x,y))
val d3 = d2.withColumnRenamed("id","id3")
.withColumnRenamed("data","data3")
val joined = d1.join(d3, d1("id") === d3("id3"))
val result = joined
.withColumn("list", aggregateDataFrames(joined("data"),joined("data3")))
.select("id","list")