有人知道如何使用Faust实现滑动窗口吗?
这个想法是计算一个键在10、30、60和300秒的窗口中出现的次数,但是我们需要在15秒或每次更新的基础上。
我有一个狡猾的解决方案,这似乎非常低效,我有一个滚动的15窗口,有效期为300秒,然后我使用delta()
方法将表中的所有旧值求和到当前值。它似乎可以处理来自6个源的消息,每个源以10条消息/秒的速度运行,但这是我们看到延迟之前的极限。显然,这是一种缓慢的方法,无法扩展,所以问题是如何在不需要KSQL或设置Spark集群以及Kafka集群的情况下实现这一点。如果可以的话,我们会尽量保持简单。
更复杂的是,我们非常希望过去24小时、1周、1个月和过去3个月的数据都是相同的……一切都在忙碌中。但也许我们只是要求太多了,没有一个专门的过程来处理每个输入。
下面是我的代码:
class AlarmCount(faust.Record, serializer='json'):
event_id: int
source_id: int
counts_10: int
counts_30: int
counts_60: int
counts_300: int
@app.agent(events_topic)
async def new_event(stream):
async for value in stream:
# calculate the count statistics
counts_10=0
counts_30=0
counts_60=0
counts_300=0
event_counts_table[value.global_id] += 1
for i in range(300):
if(i<=10):
counts_10+=event_counts_table[value.source_id].delta(i)
if(i<=30):
counts_30+=event_counts_table[value.source_id].delta(i)
if(i<=60):
counts_60+=event_counts_table[value.source_id].delta(i)
if(i<=300):
counts_300+=event_counts_table[value.source_id].delta(i)
await event_counts_topic.send(
value=EventCount(
event_id=value.event_id,
source_id=value.source_id,
counts_10=counts_10,
counts_30=counts_30,
counts_60=counts_60,
counts_300=counts_300
)
)
我想在所有窗口上进行迭代,以比较最后一个值与所有其他过去值的平均值/偏差/其他聚合。
- 类似
table[key].iter_windows()
- 且不循环所有
.delta(i)
和您一样,我将实现一个带有时间戳列表的表。如果列表太大,它将是次优的,因为changelog
将是胖的。我们应该只流式传输被修改的内容,而不是在每个事件上重复所有列表。
所以我将创建一个具有详细信息的短期列表和一个具有聚合的长期列表。然后,每个事件只更新短期列表。
所以似乎没有一个好的方法来做到这一点。
我发现的最佳解决方案是在表中存储每个id的时间戳列表,并在新事件上附加时间戳,然后删除过期的时间戳,然后返回长度作为另一个主题的新值。
唯一真正的问题是,它只捕获事件上每个时间框架的实际事件计数——而理想的情况是每个时间框架的每秒计数实时更新。但我不认为这是这个系统应该/可以用来做的——它是用来处理事件的,所以它需要一个事件。我们可以使用计时器函数每秒触发一次重新计数,但这会明显增加处理速度和吞吐量,而且由于警报的触发器是针对新事件的,所以这没有太大关系。有就好,不是必须的。
对于更长期的统计(我们这里说的是周和月),我们决定将所有事件写入数据库,然后定期(每10秒)执行类似的任务,查找新事件和过期事件,然后将聚合计数发送到Kafka主题进行额外处理。即使每秒有1000个事件,每10秒处理这些数据也只需要10-20毫秒,所以这是可以管理的,因为1000/s是一个噩梦般的场景,它只会发生一次,然后停止。