如何将'groupby(key).agg('与用户定义的函数一起使用?特别是,我需要每个键的所有唯一值的列表[而不是计数]。
collect_set
和collect_list
(分别用于无序和有序结果)可用于逐组处理结果。从一个简单的火花数据帧开始
df = sqlContext.createDataFrame(
[('first-neuron', 1, [0.0, 1.0, 2.0]),
('first-neuron', 2, [1.0, 2.0, 3.0, 4.0])],
("neuron_id", "time", "V"))
假设目标是为每个神经元(按名称分组)返回最长的V列表
from pyspark.sql import functions as F
grouped_df = tile_img_df.groupby('neuron_id').agg(F.collect_list('V'))
我们现在已经将V列表分组为一个列表列表。由于我们想要最长的长度,我们可以运行
import pyspark.sql.types as sq_types
len_udf = F.udf(lambda v_list: int(np.max([len(v) in v_list])),
returnType = sq_types.IntegerType())
max_len_df = grouped_df.withColumn('max_len',len_udf('collect_list(V)'))
要获得添加了V列表的最大长度的max_len列
我找到了pyspark.sql.functions.collect_set(col)
,它完成了我想要的工作。