如何在pyspark中找到列表中最频繁的元素?



我有一个pyspark数据框架,有两列,ID和元素。列"Elements"里面有一个列表元素。它看起来像这样,

ID | Elements
_______________________________________
X  |[Element5, Element1, Element5]
Y  |[Element Unknown, Element Unknown, Element_Z]

我想用列'Elements '中出现频率最高的元素组成一个列。输出应该像这样,

ID | Elements                                           | Output_column 
__________________________________________________________________________
X  |[Element5, Element1, Element5]                      | Element5
Y  |[Element Unknown, Element Unknown, Element_Z]       | Element Unknown 

如何使用pyspark?

提前感谢。

我们可以在这里使用高阶函数(可以从spark 2.4+中获得)

  1. 首先使用transformaggregate来获取数组中每个不同值的计数。
  2. 然后按降序排序,然后得到第一个元素。

from pyspark.sql import functions as F
temp = (df.withColumn("Dist",F.array_distinct("Elements"))
.withColumn("Counts",F.expr("""transform(Dist,x->
aggregate(Elements,0,(acc,y)-> IF (y=x, acc+1,acc))
)"""))
.withColumn("Map",F.arrays_zip("Dist","Counts")
)).drop("Dist","Counts")
out = temp.withColumn("Output_column",
F.expr("""element_at(array_sort(Map,(first,second)->
CASE WHEN first['Counts']>second['Counts'] THEN -1 ELSE 1 END),1)['Dist']"""))

输出:

请注意,我为ID z添加了一个空白数组以进行测试。也可以通过在输出

中添加.drop("Map")来删除Map列。
out.show(truncate=False)
+---+---------------------------------------------+--------------------------------------+---------------+
|ID |Elements                                     |Map                                   |Output_column  |
+---+---------------------------------------------+--------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |[{Element5, 2}, {Element1, 1}]        |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|[{Element Unknown, 2}, {Element_Z, 1}]|Element Unknown|
|Z  |[]                                           |[]                                    |null           |
+---+---------------------------------------------+--------------------------------------+---------------+

对于较低版本,您可以使用带有统计模式的udf:

from pyspark.sql import functions as F,types as T
from statistics import mode
u = F.udf(lambda x: mode(x) if len(x)>0 else None,T.StringType())
df.withColumn("Output",u("Elements")).show(truncate=False)
+---+---------------------------------------------+---------------+
|ID |Elements                                     |Output         |
+---+---------------------------------------------+---------------+
|X  |[Element5, Element1, Element5]               |Element5       |
|Y  |[Element Unknown, Element Unknown, Element_Z]|Element Unknown|
|Z  |[]                                           |null           |
+---+---------------------------------------------+---------------+

您可以使用pyspark sql函数来实现(spark 2.4+)。下面是一个泛型函数,它添加一个包含另一个数组列中最常见元素的新列。

import pyspark.sql.functions as sf
def add_most_common_val_in_array(df, arraycol, drop=False):
"""Takes a spark df column of ArrayType() and returns the most common element
in the array in a new column of the df called f"MostCommon_{arraycol}"
Args:
df (spark.DataFrame): dataframe
arraycol (ArrayType()): array column in which you want to find the most common element
drop (bool, optional): Drop the arraycol after finding most common element. Defaults to False.
Returns:
spark.DataFrame: df with additional column containing most common element in arraycol
"""
dvals = f"distinct_{arraycol}"
dvalscount = f"distinct_{arraycol}_count"
startcols = df.columns
df = df.withColumn(dvals, sf.array_distinct(arraycol))
df = df.withColumn(
dvalscount,
sf.transform(
dvals,
lambda uval: sf.aggregate(
arraycol,
sf.lit(0),
lambda acc, entry: sf.when(entry == uval, acc + 1).otherwise(acc),
),
),
)
countercol = f"ReverseCounter{arraycol}"
df = df.withColumn(countercol, sf.map_from_arrays(dvalscount, dvals))
mccol = f"MostCommon_{arraycol}"
df = df.withColumn(mccol, sf.element_at(countercol, sf.array_max(dvalscount)))
df = df.select(*startcols, mccol)
if drop:
df = df.drop(arraycol)
return df

最新更新