排序数组列,其中最后一项等于下一行的第一项



输入:

from pyspark.sql import functions as F
df = spark.createDataFrame(
[(      1,       'aa',  [None, 9]),
(      1,       None,  [   9, 1]),
(      1,       'bb',  [   1, 4]),
(      1,       'cc',  [   4, 5]),
(      2,       'ee',  [None, 2]),
(      2,       None,  [   2, 8]),
(      2,       'dd',  [   8, 7]),
(      2,       None,  [   7, 1])],
['col_id',  'col_val',   'col_arr'])

所需结果-我想按col_id分组,并从col_val:返回最后一个非空项

+------+-------+
|col_id|col_val|
+------+-------+
|     1|     cc|
|     2|     dd|
+------+-------+

问题出在订单栏上。它是一个数组,其中的最后一个元素作为下一行的第一个元素重复。在上面的示例中,col_id=2的顺序为:
[None, 2][2, 8][8, 7][7, 1]

由于[7, 1]col_val为空,因此应返回[8, 7]的结果,即'dd'。排序总是以null(无)开头。

我试过

df = (df
.filter(~F.isnull('col_val'))
.groupBy('col_id')
.agg(F.max_by('col_val', F.col('col_arr')[1]))
)
df.show()
# +------+---------------------------+
# |col_id|max_by(col_val, col_arr[1])|
# +------+---------------------------+
# |     1|                         aa|
# |     2|                         dd|
# +------+---------------------------+

这并不成功,因为我的顺序列没有遵循简单的升序/降序。

因此,经过深思熟虑,我找到了一种可行的方法。步骤:

  • 将每个col_id的修改行(作为结构)收集到列表中
  • 为每个col_id创建映射,并将内部列表的第一个元素作为键
  • 映射中的顺序查找;循环";通过数组中的元素创建有序列表
  • 删除null并提取最后一项
from pyspark.sql import functions as F, Window as W
df = df.withColumn('col_arr', F.transform('col_arr', lambda x: F.coalesce(x, F.lit(-9))))
inner_struct = F.struct('col_val', F.col('col_arr')[1].alias('last'))
c = F.collect_set(F.struct(F.col('col_arr')[0], inner_struct))
df = df.groupBy('col_id').agg(
F.element_at(F.filter(F.aggregate(
c,
F.expr("array(struct(string(null) col_val, -9L last))"),
lambda acc, x: F.array_union(
acc,
F.array(F.map_from_entries(c)[F.element_at(acc, -1)['last']])
)
), lambda x: x.col_val.isNotNull()), -1).col_val.alias('col_val')
)
df.show()
# +------+-------+
# |col_id|col_val|
# +------+-------+
# |     1|     cc|
# |     2|     dd|
# +------+-------+

相关内容

  • 没有找到相关文章

最新更新