PySpark对零件进行结构化流式处理和过滤处理



我想评估Spark 2.4:中的流式(未绑定(数据帧

time          id   value
6:00:01.000   1     333
6:00:01.005   1     123
6:00:01.050   2     544
6:00:01.060   2     544

当id 1的所有数据进入数据帧,下一个id 2的数据到来时,我想对id 1的完整数据进行计算。但是我该怎么做呢?我想我不能使用窗口函数,因为我不知道每个id的提前时间。我也不知道除了流数据帧之外的其他来源的id。

我想到的唯一解决方案包括变量比较(内存(和while循环:

id_old = 0 # start value
while true:
id_cur = id_from_dataframe
if id_cur != id_old: # id has changed
do calulation for id_cur
id_old = id_cur

但我认为这不是正确的解决办法。你能给我一个提示或文档吗?因为我找不到例子或文档。

我使用水印和分组的组合来运行它:

import pyspark.sql.functions as F
d2 = d1.withWatermark("time", "60 second") 
.groupby('id', 
F.window('time', "40 second")) 
.agg(
F.count("*").alias("count"), 
F.min("time").alias("time_start"), 
F.max("time").alias("time_stop"), 
F.round(F.avg("value"),1).alias('value_avg'))

大多数文档只显示了按时间分组的基本内容,我只看到了一个带有另一个分组参数的示例,所以我把我的'id'放在那里。

最新更新