Spring-Cloud-stream进程KAFKA消息仅在申请启动上实现的州存储才被完全填充并准备就绪之后



参考此解决方案,我的spring-cloud-stream application.yml文件具有以下配置:

#application.yml
spring.cloud.stream.bindings.input:
  destination: my-topic-name
  contentType: application/json
  consumer:
    useNativeDecoding: false
spring.cloud.stream.kafka.streams.bindings.input:
  consumer:
    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
    materializedAs: my-store

在我的主要应用程序中,带有@enableBinding注释的类和@StreamListener注释的方法内,我使用kafka stream dsl和处理器API集成来访问已在应用程序启动时在申请启动上实现的状态商店,这是因为在应用程序启动时都存在物料application.yml文件。

ReadOnlyKeyValueStore<Object, String> store;
 input.process(() -> new Processor<Object, Product>() {
                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");

                }
                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }
                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");

问题是,在应用程序启动时,州商店尚未完全填充且尚未准备就绪(例如,空态商店(,但消息仍然较早到达,并且正在通过Kafka流拓扑处理处理意外结果。

我们如何确保在第一个申请启动时,所有或特定的(如果可能的话可以定义(已指示通过在Application中使用applicatizatizAS实现的状态商店。@StreamListener内部定义的任何流处理拓扑都开始处理传入消息。我们可以强迫消息的流处理等待,直到首次申请启动时州商店已完全填充?

我试图通过修改Spring-Cloud-Stream样本之一并在此处推动修改版本来复制问题。有关此的更多详细讨论也可以在此处找到

似乎您需要与Globalktable(State Store(一起加入流,而不是使用过程接口。这是kStream加入Globalktable。请尝试。

相关内容

  • 没有找到相关文章

最新更新