如何使用collect_list



我有两个DataFrame,希望根据datetimemidbinImbalance字段将它们连接起来,并在列表中收集timeBmidB中的相应值。

我尝试了以下代码:

val d1: DataFrame
val d3: DataFrame
val d2 = d3
     .withColumnRenamed("date", "dateC")
     .withColumnRenamed("milliSec", "milliSecC")
     .withColumnRenamed("mid", "midC")
     .withColumnRenamed("time", "timeC")
     .withColumnRenamed("binImbalance", "binImbalanceC")
  d1.join(d2, d1("date") === d2("dateC") and 
              d1("time") === d2("timeC") and 
              d1("mid") === d2("midC")
         )
    .groupBy("date", "time", "mid", "binImbalance")
    .agg(collect_list("timeB"),collect_list("midB"))

但这不起作用,因为我得到了错误:: Reference 'timeB' is ambiguous, could be: timeB#16, timeB#35。同时,如果我重命名了其中一个timeB列,我将无法收集列表中的值。

一个示例结果应该是:

+-----+---------+------+------------+---------+------+
| date|     time|   mid|binImbalance|    timeB|  midB|
+-----+---------+------+------------+---------+------+
|  1  |     1   |  10  |           1|    4    |  10  |          
|  2  |     2   |  20  |           2|    5    |  11  |            
|  3  |     3   |  30  |           3|    6    |  12  |             

+-----+---------+------+------------+---------+------+
| date|     time|   mid|binImbalance|    timeB|  midB|
+-----+---------+------+------------+---------+------+
|  1  |     1   |  10  |           1|    7    |  13  |          
|  2  |     2   |  20  |           2|    8    |  14  |            
|  3  |     3   |  30  |           3|    9    |  15  |

结果:

+-----+---------+------+------------+---------+-----------+
| date|     time|   mid|binImbalance| ListTime|  ListMid  |
+-----+---------+------+------------+---------+-----------+
|  1  |     1   |  10  |           1|   [4,7] |  [10,13]  |          
|  2  |     2   |  20  |           2|   [5,8] |  [11,14]  |            
|  3  |     3   |  30  |           3|   [6,9] |  [12,15]  |

最小、完整且可验证的示例

d1            d2
id   data     id   data    
--   ----     --   ----
1    1        1    2
2    4        2    5
3    6        3    3
Result
id   list
--   ----
1    [1,2]
2    [4,5]
3    [6,3]

最小示例的解决方案:

import org.apache.spark.sql.functions.udf
val aggregateDataFrames = udf( (x: Double, y: Double) => Seq(x,y))
val d3 = d2.withColumnRenamed("id","id3")
           .withColumnRenamed("data","data3")
val joined = d1.join(d3, d1("id") === d3("id3"))

val result = joined
              .withColumn("list", aggregateDataFrames(joined("data"),joined("data3")))
              .select("id","list")

相关内容

  • 没有找到相关文章

最新更新