我有一个问题,使用逐行SAS方法自然可以解决,但我一直使用Pyspark。我有一个按时间排序的事件数据集,例如:
test_df = pd.DataFrame({'event_list':[["H"], ["H"], ["H","F"], ["F"], ["F"], ["H"], ["W"], ["W"]], 'time_order':[1,2,3,4,5,6,7,8], 'person':[1,1,1,1,1,1,1,1]})
test_df = spark.createDataFrame(test_df)
test_df.show()
+----------+----------+------+
|event_list|time_order|person|
+----------+----------+------+
| [H]| 1| 1|
| [H]| 2| 1|
| [H, F]| 3| 1|
| [F]| 4| 1|
| [F]| 5| 1|
| [H]| 6| 1|
| [W]| 7| 1|
| [W]| 8| 1|
+----------+----------+------+
我想将这些事件分组为多个事件集,其中初始事件之后的所有事件都是初始事件列表的一部分。因此,在我的test_df中,我预计会有3集:
+----------+----------+------+-------+
|event_list|time_order|person|episode|
+----------+----------+------+-------+
| [H]| 1| 1| 1|
| [H]| 2| 1| 1|
| [H, F]| 3| 1| 2|
| [F]| 4| 1| 2|
| [F]| 5| 1| 2|
| [H]| 6| 1| 2|
| [W]| 7| 1| 3|
| [W]| 8| 1| 3|
+----------+----------+------+-------+
在SAS中,我将保留event_list
的前一行的值,并且如果当前事件列表包含在前一事件列表中,则我将保留当前事件列表值而不是前一事件清单。例如,我保留的值将是[零,["H"],["H"],"H","F"],【"H"、"F"】,【"H"、"F"】,["W"]]。然后我可以通过跟踪保留值的变化来生成剧集。
在Pyspark中,我不知道如何跨行操作顺序地保留信息。。。这可能吗?我尝试使用窗口函数(按person
分区和按time_order
排序(失败了。如何在Pyspark中解决此问题?
如果您使用的是spark版本>=2.4,在窗口上的event_list列上使用collect_list
,flatten
,使用array_distinct
删除重复项,最后使用size
计算一段时间内不同事件的数量。它应该是这样的:
from pyspark.sql.functions import col, collect_list, flatten, array_distinct, size
from pyspark.sql.window import Window
w = Window.partitionBy('person').orderBy('time_order').rowsBetween(Window.unboundedPreceding, 0)
test_df = test_df.withColumn('episode', size(array_distinct(flatten(collect_list(col('event_list')).over(w)))))
test_df.show()
+----------+----------+------+-------+
|event_list|time_order|person|episode|
+----------+----------+------+-------+
| [H]| 1| 1| 1|
| [H]| 2| 1| 1|
| [H, F]| 3| 1| 2|
| [F]| 4| 1| 2|
| [F]| 5| 1| 2|
| [H]| 6| 1| 2|
| [W]| 7| 1| 3|
| [W]| 8| 1| 3|
+----------+----------+------+-------+