我可以在kafka流应用程序的peek或filter或分支中进行一些有状态的操作吗



正如我们在kafka流文档中所知,peek、filter、branch是无状态操作吗?然而,我想在这个处理器中做一些有状态的操作吗?例如,我想做一些查询,并根据结果过滤消息,我能做到吗?

操作peek()filter()branch()本质上是无状态的。当你说:

我想做一些查询,并根据结果过滤消息

这取决于您想查询什么?可以(但不推荐(查询"外部"API。然而,它并没有内置的支持,而且有许多角落的情况需要考虑,以使其健壮。请注意,查询外部系统不会使操作有状态

如果要使用state,可以使用transform()(和同级(并构建自定义运算符。如果命名所有下游操作符(通过Named和类似操作(,则可以使用context.forward(..., To.child(...))来实现自定义分支。对于筛选,您可以返回null以不转发任何内容。

不确定有状态peek((的用途,但您也可以这样做。

根据使用情况,也可以通过流表联接或流全局表联接来实现"有状态过滤器"。

IMO,实现这一点的最佳方法是使用KStream#...join或使用处理器API使用表查找来访问底层状态存储(使用KStream#transformValues(。

你可以这样做,但代码会非常糟糕(不建议这样做(,但你只能在流状态从重新警报变为运行后对ReadOnlyKeyValueStore进行只读访问:

kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
ReadOnlyKeyValueStore<Object, Object> kvStore = kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
//assign this kvStore to some place so you can later using this referrer access this in filter or in peek
}
});

相关内容

  • 没有找到相关文章

最新更新