将地图数组组合为Pyspark DataFrame中的单个地图



是否有类似于collect_list或collect_set的函数,将一列映射汇总到(分组的(pyspark dataframe中的单个地图中?例如,此功能可能具有以下行为:

>>>df.show()
+--+---------------------------------+
|id|                             map |
+--+---------------------------------+
| 1|                    Map(k1 -> v1)|
| 1|                    Map(k2 -> v2)|
| 1|                    Map(k3 -> v3)|
| 2|                    Map(k5 -> v5)|
| 3|                    Map(k6 -> v6)|
| 3|                    Map(k7 -> v7)|
+--+---------------------------------+
>>>df.groupBy('id').agg(collect_map('map')).show()
+--+----------------------------------+
|id|                 collect_map(map) |
+--+----------------------------------+
| 1| Map(k1 -> v1, k2 -> v2, k3 -> v3)|
| 2|                     Map(k5 -> v5)|
| 3|           Map(k6 -> v6, k7 -> v7)|
+--+----------------------------------+

使用其他收集_聚合和一个UDF产生所需结果可能并不困难,但是似乎已经存在类似的结果。

我知道在其他人有机会回答之前为您的问题提供答案可能很差,但是如果有人正在寻找基于UDF的版本,这是一个可能的答案。

from pyspark.sql.functions import udf,collect_list
from pyspark.sql.types import MapType,StringType
combineMap=udf(lambda maps: {key:f[key] for f in maps for key in f},
               MapType(StringType(),StringType()))
df.groupBy('id')
  .agg(collect_list('map')
  .alias('maps'))
  .select('id',combineMap('maps').alias('combined_map')).show()

concat_map deos不起作用的建议解决方案,此解决方案不使用UDFS。
对于spark> = 2.4

(df
.groupBy(f.col('id'))
.agg(f.collect_list(f.col('map')).alias('maps'), 
.select('id',
        f.expr('aggregate(slice(maps, 2, size(maps)), maps[0], (acc, element) -> map_concat(acc, element))').alias('mapsConcatenated')
        )
)

collect_list忽略了零值,因此在汇总函数中使用map_concat时无需担心它们。

使用< string,string>数据类型。

val df = Seq(
    
(1,"k1","v1"),
(1,"k2","v2"),
(1,"k3","v3"),
(2,"k5","v5"),
(3,"k6","v6"),
(3,"k7","v7")
    ).toDF("id","k","v")
df.show()
+---+----------+
|id |m         |
+---+----------+
|1  |{k1 -> v1}|
|1  |{k2 -> v2}|
|1  |{k3 -> v3}|
|2  |{k5 -> v5}|
|3  |{k6 -> v6}|
|3  |{k7 -> v7}|
+---+----------+
df.createOrReplaceTempView("id_map")
spark.sql("""
with t1 ( select *, map(k,v) m from id_map ),
     t2  (select id, collect_list(m) m1  from t1 group by id )
  select id, aggregate(m1, cast(map() as map<string,string>), (acc,x) -> map_concat(acc,x)) m2 from t2   
""").show(false)
+---+------------------------------+
|id |m2                            |
+---+------------------------------+
|1  |{k1 -> v1, k2 -> v2, k3 -> v3}|
|2  |{k5 -> v5}                    |
|3  |{k6 -> v6, k7 -> v7}          |
+---+------------------------------+

或使用struct((和map_from_entries((

spark.sql("""
with t1 ( select *, struct(k,v) m from id_map ),
     t2  (select id, collect_list(m) m1  from t1 group by id )
  select id, map_from_entries(m1) m2 from t2   
""").show(false)
+---+------------------------------+
|id |m2                            |
+---+------------------------------+
|1  |{k1 -> v1, k2 -> v2, k3 -> v3}|
|2  |{k5 -> v5}                    |
|3  |{k6 -> v6, k7 -> v7}          |
+---+------------------------------+

它是pyspark版本中的map_concat&gt; = 2.4

最新更新