正如我们在kafka流文档中所知,peek、filter、branch是无状态操作吗?然而,我想在这个处理器中做一些有状态的操作吗?例如,我想做一些查询,并根据结果过滤消息,我能做到吗?
操作peek()
、filter()
和branch()
本质上是无状态的。当你说:
我想做一些查询,并根据结果过滤消息
这取决于您想查询什么?可以(但不推荐(查询"外部"API。然而,它并没有内置的支持,而且有许多角落的情况需要考虑,以使其健壮。请注意,查询外部系统不会使操作有状态。
如果要使用state,可以使用transform()
(和同级(并构建自定义运算符。如果命名所有下游操作符(通过Named
和类似操作(,则可以使用context.forward(..., To.child(...))
来实现自定义分支。对于筛选,您可以返回null
以不转发任何内容。
不确定有状态peek((的用途,但您也可以这样做。
根据使用情况,也可以通过流表联接或流全局表联接来实现"有状态过滤器"。
IMO,实现这一点的最佳方法是使用KStream#...join
或使用处理器API使用表查找来访问底层状态存储(使用KStream#transformValues
(。
你可以这样做,但代码会非常糟糕(不建议这样做(,但你只能在流状态从重新警报变为运行后对ReadOnlyKeyValueStore
进行只读访问:
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
ReadOnlyKeyValueStore<Object, Object> kvStore = kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
//assign this kvStore to some place so you can later using this referrer access this in filter or in peek
}
});