我有一个PySpark数据框架看起来像这样,我有一个地图数据类型列Map<Str,Int>
Date Item (Map<Str,int>) Total Items ColA
2021-02-01 Item_A -> 3, Item_B -> 10, Item_C -> 2 15 10
2021-02-02 Item_A -> 1, Item_D -> 5, Item_E -> 7 13 20
2021-02-03 Item_A -> 8, Item_E -> 3, Item_C -> 1 12 30
我想对包括map列在内的所有列求和。对于map列,应该根据键值计算总和。
我想要这样的东西:
[[Item_A -> 12, Item_B -> 10, Item_C -> 3, Item_D -> 5, Item_E -> 10], 40, 60]
不一定是列表的列表,但我想要列的和。
我的方法:
df.rdd.map(lambda x: (1,x[1])).reduceByKey(lambda x,y: x + y).collect()[0][1]
您可以分别对map列和其他列进行聚合,因为您需要对Items列进行explode
,然后需要求和的其他列将变得难以处理。
dataframe例子:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('2021-02-01', {'Item_A': 3, 'Item_B': 10, 'Item_C': 2}, 15, 10),
('2021-02-02', {'Item_A': 1, 'Item_D': 5, 'Item_E': 7}, 13, 20),
('2021-02-03', {'Item_A': 8, 'Item_E': 3, 'Item_C': 1}, 12, 30)],
['Date', 'Item', 'Total Items', 'ColA'])
df.show(truncate=0)
# +----------+----------------------------------------+-----------+----+
# |Date |Item |Total Items|ColA|
# +----------+----------------------------------------+-----------+----+
# |2021-02-01|{Item_C -> 2, Item_B -> 10, Item_A -> 3}|15 |10 |
# |2021-02-02|{Item_E -> 7, Item_D -> 5, Item_A -> 1} |13 |20 |
# |2021-02-03|{Item_E -> 3, Item_C -> 1, Item_A -> 8} |12 |30 |
# +----------+----------------------------------------+-----------+----+
脚本:
aggs = df.agg(F.sum('Total Items'), F.sum('ColA')).head()
df = (df
.select('*', F.explode('Item'))
.groupBy('key')
.agg(F.sum('value').alias('value'))
.select(
F.map_from_entries(F.collect_set(F.struct('key', 'value'))).alias('Item'),
F.lit(aggs[0]).alias('Total Items'),
F.lit(aggs[1]).alias('ColA'),
)
)
df.show(truncate=0)
# +--------------------------------------------------------------------+-----------+----+
# |Item |Total Items|ColA|
# +--------------------------------------------------------------------+-----------+----+
# |{Item_C -> 3, Item_E -> 10, Item_A -> 12, Item_B -> 10, Item_D -> 5}|40 |60 |
# +--------------------------------------------------------------------+-----------+----+