输入:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[( 1, 'aa', [None, 9]),
( 1, None, [ 9, 1]),
( 1, 'bb', [ 1, 4]),
( 1, 'cc', [ 4, 5]),
( 2, 'ee', [None, 2]),
( 2, None, [ 2, 8]),
( 2, 'dd', [ 8, 7]),
( 2, None, [ 7, 1])],
['col_id', 'col_val', 'col_arr'])
所需结果-我想按col_id
分组,并从col_val
:返回最后一个非空项
+------+-------+
|col_id|col_val|
+------+-------+
| 1| cc|
| 2| dd|
+------+-------+
问题出在订单栏上。它是一个数组,其中的最后一个元素作为下一行的第一个元素重复。在上面的示例中,col_id
=2的顺序为:[None, 2]
、[2, 8]
、[8, 7]
、[7, 1]
。
由于[7, 1]
的col_val
为空,因此应返回[8, 7]
的结果,即'dd'
。排序总是以null(无)开头。
我试过
df = (df
.filter(~F.isnull('col_val'))
.groupBy('col_id')
.agg(F.max_by('col_val', F.col('col_arr')[1]))
)
df.show()
# +------+---------------------------+
# |col_id|max_by(col_val, col_arr[1])|
# +------+---------------------------+
# | 1| aa|
# | 2| dd|
# +------+---------------------------+
这并不成功,因为我的顺序列没有遵循简单的升序/降序。
因此,经过深思熟虑,我找到了一种可行的方法。步骤:
- 将每个
col_id
的修改行(作为结构)收集到列表中 - 为每个
col_id
创建映射,并将内部列表的第一个元素作为键 - 映射中的顺序查找;循环";通过数组中的元素创建有序列表
- 删除null并提取最后一项
from pyspark.sql import functions as F, Window as W
df = df.withColumn('col_arr', F.transform('col_arr', lambda x: F.coalesce(x, F.lit(-9))))
inner_struct = F.struct('col_val', F.col('col_arr')[1].alias('last'))
c = F.collect_set(F.struct(F.col('col_arr')[0], inner_struct))
df = df.groupBy('col_id').agg(
F.element_at(F.filter(F.aggregate(
c,
F.expr("array(struct(string(null) col_val, -9L last))"),
lambda acc, x: F.array_union(
acc,
F.array(F.map_from_entries(c)[F.element_at(acc, -1)['last']])
)
), lambda x: x.col_val.isNotNull()), -1).col_val.alias('col_val')
)
df.show()
# +------+-------+
# |col_id|col_val|
# +------+-------+
# | 1| cc|
# | 2| dd|
# +------+-------+