Kafka 流 - 如果在一段时间内未收到给定密钥的事件,如何发送警报



如果在一段时间内未收到给定键的主题中的事件,我需要发送警报。使用 KafkaStream 解决此用例的最佳方法是什么?

我试过了:

1)一个窗口由和一个抑制运算符:

stream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1000)).grace(Duration.ZERO))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((k, v) -> v == 0)
.toStream()
.map((windowId, count) -> KeyValue.pair(windowId.key(), AlarmEvent.builder().build()))
.to(ALARMS, Produced.with(Serdes.String(), AlarmEvent.serde()));

但似乎窗口不会关闭,直到过期后发生事件,因此在定义的超时之后无法发送警报。

2)使用带有点器的处理器API,它似乎可以工作,但我只使用拓扑测试驱动程序advanceWallClockTime()进行了测试。不确定此 advanceWallClockTime() 是否会影响实时提前,或者仅在事件接收时更改,从而回退到问题 1)。

3)如果标点符号有效,我想在ValueTranformer中使用它,以从DSL拓扑中受益。但是,我遇到了如何在价值转换器中的标点符号实例下游转发事件?中描述的问题。无法从标点符号实例向下游发送事件。

4)最后,我有了一个想法,为每个分区定期(例如每秒)注入一些虚拟事件,以便人为地迫使内部时钟前进。这将允许我使用干净简单的DSL窗口并抑制运算符。

2)使用带有punctator的处理器API,它似乎可以工作,但我只使用TopologyTestDriver和advanceWallClockTime()进行了测试。不确定此 advanceWallClockTime() 是否会影响实时提前,或者仅在事件接收时更改,从而回退到问题 1)。

这是正确的做法。顾名思义,标点符号可以根据挂钟时间(即系统时间)触发。TopologyTestDriver模拟挂钟时间进行测试,但KafkaStreams将使用系统时间。

3)如果标点符号有效,我想在ValueTranformer中使用它,以从DSL拓扑中受益。但是,我遇到了如何在价值转换器中的标点符号实例下游转发事件?中描述的问题。无法从标点符号实例向下游发送事件。

您需要改用transform()ValueTransformer的标点符号不允许通过forward()发出数据,因为您可能会发出任何密钥,从而违反未修改密钥的协定。

4)最后,我有了一个想法,为每个分区定期(例如每秒)注入一些虚拟事件,以便人为地迫使内部时钟前进。这将允许我使用干净简单的DSL窗口并抑制运算符。

这也应该行得通。

最新更新