正在从转换中的数据更新全局存储



我目前有一个简单的拓扑结构:

KStream<String, Event> eventsStream = builder.stream(sourceTopic);
eventsStream.transformValues(processorSupplier, "nameCache")
.to(destinationTopic);

我的事件有时有一个键/值对,而其他时候只有键。我希望能够将值添加到那些缺少值的事件中。我可以在本地状态存储中正常工作,但当我添加更多任务时,有时键/值事件和值事件在不同的线程中,因此它们不会正确更新。

我想使用全局状态存储,但当新的键/值对出现时,我很难弄清楚如何更新全局存储。我用以下代码创建了一个全局状态存储:

builder.addGlobalStore(stateStore, "global_store", Consumed.with(Serdes.String(), Serdes.String()), new ProcessorSupplier<String, String>() {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
private ProcessorContext context;
@Override
public void init(final ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public void process(final String key, final String value) {
context.forward(key, value);
}
@Override
public void close() {
}
};
}
});

据我所知,它正在发挥作用,但由于该主题中没有数据,我不确定。

所以我的问题是如何从transformValues内部更新全局存储?store.put()失败,错误为全局存储为只读。

我在Kafka Streams上找到了Write to GlobalStateStore,但公认的答案只是说更新底层主题,但我不知道该怎么做,因为主题不在我的流中。

---编辑---

我根据已接受的答案中的#1更新了代码。我看到新的键/值对显示在global_store中。但是globalStore似乎没有看到新的密钥。如果我重新启动应用程序,它会用主题中的数据填充缓存,但直到我停止/启动应用程序之后,新的键才可见。

我在全局存储处理器的process(String, String)中添加了日志记录,它显示了正在处理的新密钥。有什么想法吗?

  1. 您只能在transformValues中获得对全局状态存储的真正访问权限,如果您想更新全局状态存储,是的,您必须将更新发送到全局状态存储中的底层输入主题,当使用此更新消息时,您的状态将更新值。这背后的原因是,全局状态存储填充在所有应用程序实例上,并使用此输入主题进行容错。可以通过分支拓扑来执行此操作:
KStream<String, Event> eventsStream = builder.stream(sourceTopic);
//processing message as normal
eventsStream.transformValues(processorSupplier, "nameCache")
.to(destinationTopic);
//this transform to the updated message to global state
eventsStream.transform(updateGlobalStateProcessorSupplier, "nameCache")
.to("global_store");
  1. 使用低级别API手动构建拓扑,因此您可以使用ProcessorContext.forward将消息转发到destinationTopic主题和global_state主题,以使用接收处理器的名称将消息转发给接收处理器节点

最新更新