我有下面的PySpark数据框架。Column_2是复杂的数据类型array
Column_1 Column_2 Column_3
A [{Mat=7},{Phy=8}] ABC
A [{Mat=7},{Phy=8}] CDE
B [{Mat=6},{Phy=7}] ZZZ
我必须对列1和列2进行分组,并得到列3的最小集合。
问题是当我尝试按列1和列2分组时它会给我一个错误
不能用作分组表达式,因为数据类型不是可排序数据类型
是否有办法将该列包含在group by中或以某种方式将其聚合。对于column_1
中的键值,column_2中的值将始终相同。预期输出:
Column_1 Column_2 Column_3
A [{Mat=7},{Phy=8}] ABC
B [{Mat=6},{Phy=7}] ZZZ
是否有可能在聚合函数中收集所有值的列表并将其扁平化并删除重复项?
column_2中的值与column_1中的键值始终相同
如果是,那么就取组中的first
值。
测试dataframe:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 7, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)
print(df.dtypes)
# +--------+------------------------+--------+
# |Column_1|Column_2 |Column_3|
# +--------+------------------------+--------+
# |A |[{Mat -> 7}, {Phy -> 8}]|ABC |
# |A |[{Mat -> 7}, {Phy -> 8}]|CDE |
# |B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
# +--------+------------------------+--------+
# [('Column_1', 'string'), ('Column_2', 'array<map<string,bigint>>'), ('Column_3', 'string')]
聚合:
df2 = df.groupBy('Column_1').agg(
F.first('Column_2').alias('Column_2'),
F.min('Column_3').alias('Column_3')
)
df2.show(truncate=False)
# +--------+------------------------+--------+
# |Column_1|Column_2 |Column_3|
# +--------+------------------------+--------+
# |A |[{Mat -> 7}, {Phy -> 8}]|ABC |
# |B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
# +--------+------------------------+--------+
我以为您想选择Column_2
中值的最小总和。因此,我稍微修改了数据框,以确保组A
具有多个值。看到df
df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 3, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)
df
+--------+------------------------+--------+
|Column_1|Column_2 |Column_3|
+--------+------------------------+--------+
|A |[{Mat -> 7}, {Phy -> 8}]|ABC |
|A |[{Mat -> 3}, {Phy -> 8}]|CDE |
|B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
+--------+------------------------+--------+
解决方案
If my assumption is right
- 将
Column_2
中键值对的值提取到filter
列中 - 将它们相加。保存出来在
filter
Column_1
和filter
排序- 删除子集
Column_1
的副本 下面
代码
new = df.withColumn("filter",F.expr("aggregate(transform(Column_2,x -> map_values(x)[0] ),cast(0 as bigint),(x,i)->x+i)")).orderBy('Column_1',desc('filter')).dropDuplicates(['Column_1']).drop('filter')
new.show()
+--------+------------------------+--------+
|Column_1|Column_2 |Column_3|
+--------+------------------------+--------+
|A |[{Mat -> 7}, {Phy -> 8}]|ABC |
|B |[{Mat -> 6}, {Phy -> 7}]|ZZZ |
+--------+------------------------+--------+