我有一个像下面这样的表格:
| id | item |
| -- | ------------------------------------ |
| 1 | {order_id: 1, item_id: 1, price: 10} |
| 2 | {order_id: 1, item_id: 2, price: 11} |
| 3 | {order_id: 2, item_id: 3, price: 12} |
| 4 | {order_id: 2, item_id: 4, price: 13} |
我需要将表中的行聚合为以下内容:
| order_id | order |
| -------- | ------------------------------------------------------------------------ |
| 1 | {order_id: 1, items: [{item_id: 1, price: 10}, {item_id: 2, price: 11}]} |
| 2 | {order_id: 2, items: [{item_id: 3, price: 12}, {item_id: 4, price: 13}]} |
最初我认为UDAF可以做到这一点,但是当我实现一个聚合器UDAF函数时,我不确定在合并方法中返回什么,如果订单id不同,它们不能合并。
从Spark 1.6及更高版本开始,您不需要UDAF,您可以使用内置SQL函数collect_list聚合行对象
如果您的表模式如下:
root
|-- id: integer (nullable = false)
|-- item: struct (nullable = true)
| |-- order_id: integer (nullable = true)
| |-- item_id: integer (nullable = true)
| |-- price: double (nullable = true)
在dataframe
中加载你的表后,你的代码应该是(在scala中):
import org.apache.spark.sql.functions.{collect_list, struct}
dataframe
.groupBy("item.order_id")
.agg(collect_list(struct("item.item_id", "item.price")).as("items"))
.withColumn("order", struct("order_id", "items"))
.drop("items")
假设如下模型:
case class Order(order_id: Int, items: Seq[Item])
case class Item(item_id: Int, price: Double)
case class Line(item: Item)
使用groupBy
按item.order_id
分组行,然后收集项:
import sparkSession.implicits._
df.groupBy($"item.order_id")
.as[Int, Line]
.mapGroups { case (order_id, lines) =>
(order_id, Order(order_id, lines.toSeq.map(line => Item(line.item.item_id, line.item.price))))
}