在PySpark中使用数组对列进行Group by和aggregate



我有下面的PySpark数据框架。Column_2是复杂的数据类型array<string,bigint>>

Column_1 Column_2                 Column_3
A        [{Mat=7},{Phy=8}]        ABC
A        [{Mat=7},{Phy=8}]        CDE
B        [{Mat=6},{Phy=7}]        ZZZ

我必须对列1和列2进行分组,并得到列3的最小集合。

问题是当我尝试按列1和列2分组时它会给我一个错误

不能用作分组表达式,因为数据类型不是可排序数据类型

是否有办法将该列包含在group by中或以某种方式将其聚合。对于column_1

中的键值,column_2中的值将始终相同。预期输出:

Column_1 Column_2                Column_3
A        [{Mat=7},{Phy=8}]       ABC
B        [{Mat=6},{Phy=7}]       ZZZ

是否有可能在聚合函数中收集所有值的列表并将其扁平化并删除重复项?

column_2中的值与column_1中的键值始终相同

如果是,那么就取组中的first值。

测试dataframe:

from pyspark.sql import functions as F
df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 7, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)
print(df.dtypes)
# +--------+------------------------+--------+
# |Column_1|Column_2                |Column_3|
# +--------+------------------------+--------+
# |A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
# |A       |[{Mat -> 7}, {Phy -> 8}]|CDE     |
# |B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
# +--------+------------------------+--------+
# [('Column_1', 'string'), ('Column_2', 'array<map<string,bigint>>'), ('Column_3', 'string')]

聚合:

df2 = df.groupBy('Column_1').agg(
F.first('Column_2').alias('Column_2'),
F.min('Column_3').alias('Column_3')
)
df2.show(truncate=False)
# +--------+------------------------+--------+
# |Column_1|Column_2                |Column_3|
# +--------+------------------------+--------+
# |A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
# |B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
# +--------+------------------------+--------+
我可能误解了你的问题。如果我这样做了,没有人会受益。

我以为您想选择Column_2中值的最小总和。因此,我稍微修改了数据框,以确保组A具有多个值。看到df

df = spark.createDataFrame(
[('A', 'ABC', 7, 8),
('A', 'CDE', 3, 8),
('B', 'ZZZ', 6, 7)],
['Column_1', 'Column_3', 'm', 'p'])
df = df.select(
'Column_1',
F.array(F.create_map(F.lit('Mat'), 'm'), F.create_map(F.lit('Phy'), 'p')).alias('Column_2'),
'Column_3'
)
df.show(truncate=False)  

df

+--------+------------------------+--------+
|Column_1|Column_2                |Column_3|
+--------+------------------------+--------+
|A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
|A       |[{Mat -> 3}, {Phy -> 8}]|CDE     |
|B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
+--------+------------------------+--------+

解决方案

If my assumption is right

  1. Column_2中键值对的值提取到filter列中
  2. 将它们相加。保存出来在filter
  3. Column_1filter排序
  4. 删除子集Column_1的副本
  5. 下面

代码

new = df.withColumn("filter",F.expr("aggregate(transform(Column_2,x -> map_values(x)[0] ),cast(0 as bigint),(x,i)->x+i)")).orderBy('Column_1',desc('filter')).dropDuplicates(['Column_1']).drop('filter')
new.show()

+--------+------------------------+--------+
|Column_1|Column_2                |Column_3|
+--------+------------------------+--------+
|A       |[{Mat -> 7}, {Phy -> 8}]|ABC     |
|B       |[{Mat -> 6}, {Phy -> 7}]|ZZZ     |
+--------+------------------------+--------+

最新更新