在pyspark中,如何groupBy并收集数组列中包含的所有不同结构的列表



我有一个带有以下模式的表

root
|-- match_keys: array (nullable = false)
|    |-- element: struct (containsNull = false)
|    |    |-- key: string (nullable = false)
|    |    |-- entity1: string (nullable = true)
|    |    |-- entity2: string (nullable = true)
|-- src: string (nullable = true)
|-- dst: string (nullable = true)

这里有一个例子:

src |dst| match_keys
----------------------------------------------------------------------------
a1  |d1 | [{"key": "name", "entity1": "john", "entity2": "john"}]   
a1  |d1 | [{"key": "name", "entity1": "john", "entity2": "john"},
{"key": "dob", "entity1": "21/01/1999", "entity2": "21/01/1999"}]
a1  |d1 | [{"key": "name", "entity1": "john", "entity2": "john"}
{"key": "country", "entity1": "IT", "entity2": "IT"}]

我要找的是:

src |dst| match_keys
----------------------------------------------------------------------------
a1  |d1 | [{"key": "name", "entity1": "john", "entity2": "john"}, 
{"key": "dob", "entity1": "21/01/1999", "entity2": "21/01/1999"}, 
{"key": "country", "entity1": "IT", "entity2": "IT"}

我当然试过了:

(df
.groupBy("src", "dst")
.agg(
F.flatten(F.collect_set(F.col("match_keys")).alias("match_keys"))
)
).show(truncate=False)

但这会导致以下结果(名称中有所有重复的结构(。

src |dst| match_keys
----------------------------------------------------------------------------
a1  |d1 | [{"key": "name", "entity1": "john", "entity2": "john"},
{"key": "name", "entity1": "john", "entity2": "john"},  
{"key": "dob", "entity1": "21/01/1999", "entity2": "21/01/1999"}, 
{"key": "name", "entity1": "john", "entity2": "john"}, 
{"key": "country", "entity1": "IT", "entity2": "IT"}

您需要的是array_distinct函数。

df = df.groupBy("src", "dst").agg(F.array_distinct(F.flatten(F.collect_set(F.col("match_keys")))).alias("match_keys"))

最新更新