我有一个像下面这样的表格:
transaction_id | transaction_date | partition_key | amount | record_id | record_in_date | 1 | 2021-09-21 | 1 | 1 | 1 | 2021-09-20 | 1
---|---|---|---|---|---|
2021-09-21 | 1 | 1 | 2 | 2021-09-20 | |
2021-09-21 | 2 | 1 | 3 | 2021-09-20 | |
2021-09-21 | 1 | 1 | 4 | 2021-09-20 | |
2021-09-21 | 1 | 1 | 5 | 2021-09-20 | |
3 | 2021-09-21 | 2 | 1 | 6 | 2021-09-20 |
您可以先执行内部聚合,然后再次聚合结果:
from pyspark.sql import functions as F
df = ...
df1=df.groupBy("transaction_id", "transaction_date", "partition_key")
.agg(F.sum("amount").alias("record_amount_sum"),
F.collect_list(F.struct("record_id", "amount", "record_in_date")).alias("records"))
.groupBy("transaction_id", "transaction_date")
.agg(F.collect_list(
F.struct("partition_key", "record_amount_sum", "records")).alias("transaction_partition"))
df1.orderBy("transaction_id").toJSON().collect()
您可以尝试以下spark sql查询
SELECT
transaction_id,
transaction_date,
collect_list(
STRUCT(
partition_key,
record_amount_sum,
records
)
) as transaction_partition
FROM (
SELECT
transaction_id,
transaction_date,
partition_key,
SUM(amount) as record_amount_sum,
collect_list(
STRUCT(
record_id,
amount as record_amount,
record_in_date
)
) as records
FROM
my_temp_view
GROUP BY
transaction_id,
transaction_date,
partition_key
) t
GROUP BY
transaction_id,
transaction_date
让我知道这是否适合你。