Kafka:在 x 时间内没有更新时更新密钥



在使用Kafka 时,有没有办法在 x 时间没有看到密钥后更新它?

类似的东西

records
.groupByKey
.windowedBy(
TimeWindows
.of(Duration.ofMinutes(5))
.grace(Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1))
).count()
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
).updateNotSeen(Duration.ofMinutes(30), (k) => (k, 0))

所以在这里,每当卡夫卡在30分钟后没有看到记录时,它就会发出一个新的记录。(由假设的更新完成。

在我的搜索中,我发现了这个悬而未决的问题,如果它存在,它允许我以某种方式做到这一点,但我不知道我现在会怎么做。

据我所知,这在DSL(Java,Scala)中是不可能的。

但是,在提供开箱即用的此类功能之前,您可以使用 Kafka Stream 的处理器 API 自行实现此类自定义功能。(例如,处理器 API 同样可用于实现自定义联接操作。在这种情况下,您将不使用表(这是仅DSL的抽象),而是使用状态存储(表由状态存储,fwiw支持),它支持从附加的ProcessorTransformer直接读写访问。处理器和转换器支持标点符号来安排定期操作,类似于cron。在此类计划操作期间,您可以检查由其记录键标识的任何记录在过去 30 分钟内是否未看到更新,然后采取相应措施。

此外,知道您可以将处理器 API 和 DSL(到目前为止您一直在使用)结合起来是非常有帮助的。 也就是说,您可以继续对大多数代码使用 DSL,并且仅在需要的时间和地点"插入"上述处理器/转换器(来自处理器 API)。

希望这有帮助!

最新更新