如何在多列上编写pyspark udaf



我在一个名为 end_stats_df的pyspark数据框中具有以下数据:

values     start    end    cat1   cat2
10          1        2      A      B
11          1        2      C      B
12          1        2      D      B
510         1        2      D      C
550         1        2      C      B
500         1        2      A      B
80          1        3      A      B

我想以以下方式汇总:

  • 我想将" start"one_answers" end"列用作聚合键
  • 对于每组行,我需要做以下操作:
    • 为该组计算cat1cat2中的唯一值数。例如,对于start = 1和 end = 2的组,此数字为4,因为有a,b,c,D。此数字将存储为n(在此示例中n = 4)。
    • 对于values字段,对于每个组,我需要对values进行排序,然后选择每个n-1值,其中n是上面第一个操作中存储的值。
    • 在聚合结束时,我不在乎上述操作后的cat1cat2中的内容。

从上面示例输出的示例是:

values     start    end    cat1   cat2
12          1        2      D      B
550         1        2      C      B
80          1        3      A      B

如何使用Pyspark DataFrames完成?我假设我需要使用自定义UDAF,对

pyspark不直接支持UDAF,因此我们必须手动进行聚合。

from pyspark.sql import functions as f
def func(values, cat1, cat2):
    n = len(set(cat1 + cat2))
    return sorted(values)[n - 2]

df = spark.read.load('file:///home/zht/PycharmProjects/test/text_file.txt', format='csv', sep='t', header=True)
df = df.groupBy(df['start'], df['end']).agg(f.collect_list(df['values']).alias('values'),
                                            f.collect_set(df['cat1']).alias('cat1'),
                                            f.collect_set(df['cat2']).alias('cat2'))
df = df.select(df['start'], df['end'], f.UserDefinedFunction(func, StringType())(df['values'], df['cat1'], df['cat2']))

相关内容

  • 没有找到相关文章

最新更新