我有许多IoT设备,可以通过消息向Kafka主题报告事件,并且我定义了一个聚合器来从这些事件中更新设备状态。
我想做的就是能够将输入流连接到聚合器在聚合更新状态之前输出的ktable,也就是说,我想将事件与当前状态进行比较,如果它们与某个谓词匹配,请进行一些处理,然后更新状态。
我已经尝试使用StreamsBuilder#addStateStore
创建状态商店,但是该方法返回了streamsbuilder,并且似乎并没有为我提供一种将其转换为KTable的方法。
我尝试使用StreamsBuilder#aggregate
生产的KTable加入输入流,但这并不能做我想要的,因为它仅在聚集后的KTable中给我带来的值,我喜欢它在聚集之前运行。
// this is fine, but it returns a StreamsBuilder and I don't see how to get a KTable out of it
streamsBuilder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(deviceStateAggregator),
Serdes.String(),
Serdes.String()
)
);
// this doesn't work because I only get doThingsBeforeStateUpdate called after the state is updated by the DeviceStateAggregator
KTable<String, DeviceState> deviceTable = deviceEventKStream
.groupByKey(Serialized.with(Serdes.String(), new deviceEventSerde()))
.aggregate(
() -> null,
new DeviceStateAggregator(),
Materialized.<String, DeviceState>as(stateStoreSupplier)
.withValueSerde(deviceStateSerde)
);
deviceEventKStream.join(deviceTable, (event, state) -> doThingsBeforeStateUpdate(event, state));
我希望能够利用流DSL来检查某些先决条件,然后在汇总器更新状态之前,但似乎不可能。我目前正在探索使用处理器的想法,或者只是扩展我的Devicestateaggregator也可以完成所有预种群处理,但这对我来说感到很尴尬,因为这迫使聚集迫使聚集在关心似乎并不是没有的问题作为聚合的一部分合理地做。
也就是说,我想将事件与当前状态进行比较,如果它们与某个谓词匹配,请进行一些处理,然后更新状态。
如果我理解您的问题,尤其是正确的报价,那么我会遵循您的想法使用处理器API来实现此问题。您将需要实现Transformer
(因为您希望它输出数据,而不仅仅是阅读(。
作为您可以用作起点的示例应用程序,我建议您查看https://github.com/confluentinc/kafka-streams-examples上的MixAndMatch DSL + Processor API
和CustomStreamTableJoin
示例。第二个示例显示,尽管对于不同的用例,但是如何在处理器API中使用状态时进行自定义"如果这样"逻辑,此外,它还涵盖了JOIN功能,这也是您想做的事情。<<<</p>
希望这会有所帮助!