我有一个数组类型列的数据集。从这个列中,我们需要创建另一个列,该列将包含唯一元素及其计数的列表。
示例[a,b,e,b]
的结果应该是[[b,a,e],[2,1,1]]
。数据应按计数排序。即使是键值,其中值是计数也可以。我为此目的创建了一个udf
(请见下文),但它很慢,所以我需要在PySpark内置函数中这样做。
collected_col_a[a, b, e, b] [a, b, e, b]
您可以结合使用transform
和filter
函数以及array_distinct
和size
来获得所需的输出。下面是一个例子:
from pyspark.sql import functions as F
# example of input dataframe
df = spark.createDataFrame([(1, ["a", "b", "e", "b"]), (2, ["a", "a", "c", "b"])], ["id", "arrayCol"])
df1 = df.withColumn(
"uniqueCount",
F.transform(
F.array_distinct("arrayCol"),
lambda x: F.struct(
x.alias("value"),
F.size(F.filter("arrayCol", lambda y: x == y)).alias("cout")
)
)
)
df1.show(truncate=False)
#+---+------------+------------------------+
#|id |arrayCol |uniqueCount |
#+---+------------+------------------------+
#|1 |[a, b, e, b]|[{a, 1}, {b, 2}, {e, 1}]|
#|2 |[a, a, c, b]|[{a, 2}, {c, 1}, {b, 1}]|
#+---+------------+------------------------+
创建地图的方法。使用aggregate
和map_zip_with
。另一种方法似乎更清晰。
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, 'a', ['a', 'b', 'e', 'b']),
(1, 'b', ['a', 'b', 'e', 'b'])],
['id', 'col_a', 'collected_col_a']
)
df = df.withColumn('elem_count',
F.aggregate(
'collected_col_a',
F.lit(None).cast('map<string,int>'),
lambda m, x: F.map_zip_with(
F.coalesce(m, F.create_map(x, F.lit(0))),
F.create_map(x, F.lit(1)),
lambda k, v1, v2: F.coalesce(v1, F.lit(0)) + F.coalesce(v2, F.lit(0))
)
)
)
df.show(truncate=0)
# +---+-----+---------------+------------------------+
# |id |col_a|collected_col_a|elem_count |
# +---+-----+---------------+------------------------+
# |1 |a |[a, b, e, b] |{a -> 1, b -> 2, e -> 1}|
# |1 |b |[a, b, e, b] |{a -> 1, b -> 2, e -> 1}|
# +---+-----+---------------+------------------------+
对不起,我不知道如何根据地图值排序。