自定义窗口函数



我目前正试图在PySpark数据框中提取一系列连续出现的事件,并对它们进行排序/排序,如下所示(为了方便,我已按user_idtimestamp对初始数据框进行排序):

df_ini
+-------+--------------------+------------+
|user_id|     timestamp      |  actions   |
+-------+--------------------+------------+
| 217498|           100000001|    'A'     |
| 217498|           100000025|    'A'     |
| 217498|           100000124|    'A'     |
| 217498|           100000152|    'B'     |
| 217498|           100000165|    'C'     |
| 217498|           100000177|    'C'     |
| 217498|           100000182|    'A'     |
| 217498|           100000197|    'B'     |
| 217498|           100000210|    'B'     |
| 854123|           100000005|    'A'     |
| 854123|           100000007|    'A'     |
| etc.

:

expected df_transformed
+-------+------------+------------+------------+
|user_id|  actions   | nb_of_occ  |    order   |
+-------+------------+------------+------------+
| 217498|    'A'     |      3     |     1      |
| 217498|    'B'     |      1     |     2      |
| 217498|    'C'     |      2     |     3      |
| 217498|    'A'     |      1     |     4      |
| 217498|    'B'     |      2     |     5      |
| 854123|    'A'     |      2     |     1      |
| etc.

我的猜测是,我必须使用一个智能窗口函数分区表的user_id和动作,但只有当这些动作是连续的时间 !我不知道该怎么做……

如果有人在PySpark中遇到过这种类型的转换,我很乐意得到一个提示!

欢呼

这是一个非常常见的模式,可以通过几个步骤使用窗口函数来表示。首先导入所需的函数:

from pyspark.sql.functions import sum as sum_, lag, col, coalesce, lit
from pyspark.sql.window import Window

下一步定义一个窗口:

w = Window.partitionBy("user_id").orderBy("timestamp")

标记每组的第一行:

is_first = coalesce(
  (lag("actions", 1).over(w) != col("actions")).cast("bigint"),
  lit(1)
)

定义order:

order = sum_("is_first").over(w)

并将所有部分合并为一个聚合体:

(df
    .withColumn("is_first", is_first)
    .withColumn("order", order)
    .groupBy("user_id", "actions", "order")
    .count())

如果将df定义为:

df = sc.parallelize([
    (217498, 100000001, 'A'), (217498, 100000025, 'A'), (217498, 100000124, 'A'),
    (217498, 100000152, 'B'), (217498, 100000165, 'C'), (217498, 100000177, 'C'),
    (217498, 100000182, 'A'), (217498, 100000197, 'B'), (217498, 100000210, 'B'),
    (854123, 100000005, 'A'), (854123, 100000007, 'A')
]).toDF(["user_id", "timestamp", "actions"])

并按user_idorder排序,你会得到:

+-------+-------+-----+-----+ 
|user_id|actions|order|count|
+-------+-------+-----+-----+
| 217498|      A|    1|    3|
| 217498|      B|    2|    1|
| 217498|      C|    3|    2|
| 217498|      A|    4|    1|
| 217498|      B|    5|    2|
| 854123|      A|    1|    2|
+-------+-------+-----+-----+

恐怕使用标准的数据框窗口函数是不可能的。但是你仍然可以使用旧的RDD API groupByKey()来实现这个转换:

>>> from itertools import groupby
>>> 
>>> def recalculate(records):
...     actions = [r.actions for r in sorted(records[1], key=lambda r: r.timestamp)]
...     groups = [list(g) for k, g in groupby(actions)]
...     return [(records[0], g[0], len(g), i+1) for i, g in enumerate(groups)]
... 
>>> df_ini.rdd.map(lambda row: (row.user_id, row)) 
...     .groupByKey().flatMap(recalculate) 
...     .toDF(['user_id', 'actions', 'nf_of_occ', 'order']).show()
+-------+-------+---------+-----+
|user_id|actions|nf_of_occ|order|
+-------+-------+---------+-----+
| 217498|      A|        3|    1|
| 217498|      B|        1|    2|
| 217498|      C|        2|    3|
| 217498|      A|        1|    4|
| 217498|      B|        2|    5|
| 854123|      A|        2|    1|
+-------+-------+---------+-----+

相关内容

  • 没有找到相关文章

最新更新