如何在Spring Cloud Stream应用程序中添加全局状态存储



我正试图在Spring Cloud Stream应用程序中配置一个全局状态存储,但失败了:

线程中的异常";svc-toa-app-data-integ-io-dev-fd2e1a47-0758-4b95-9b2e-022ca4df95ca-StreamThread-1";org.apache.kafka.streams.errors.TopologyException:无效拓扑:拓扑结构未知主题。如果相同的应用程序执行不同的拓扑。请注意,只有在所有运算符都按相同顺序添加的情况下,拓扑才是相同的。

根据官方文档,我添加了以下内容来配置状态存储:

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals(applicationId)) {
try {
final var keyValueStore = new KeyValueStoreBuilder<>(
Stores.inMemoryKeyValueStore(PRODUCT_INFO_STORE),
productInfoKeySerde(),
productInfoValueSerde(),
Time.SYSTEM
);
final StreamsBuilder streamsBuilder = factoryBean.getObject();
streamsBuilder.addGlobalStore(
keyValueStore.withLoggingDisabled(),
productInfoTopic,
Consumed.with(productInfoKeySerde(), productInfoValueSerde()),
ProductGlobalStateStoreProcessor::new
);
} catch (Exception e) {
e.printStackTrace();
}
}
};
}

我有一个处理器,我想在那里使用状态存储:

public class ProductGlobalStateStoreProcessor implements Processor<ProductInfoKey, ProductInfo, Void, Void> {
KeyValueStore<ProductInfoKey, ProductInfo> stateStore;
@Override
public void init(ProcessorContext<Void, Void> context) {
stateStore = context.getStateStore(ProductGlobalStateStoreConfiguration.PRODUCT_INFO_STORE);
}
@Override
public void process(Record<ProductInfoKey, ProductInfo> record) {
stateStore.put(record.key(), record.value());
}
}

我有点迷失了我所缺少的东西,我试图在属性中添加gktable绑定,但它不会改变任何东西。

我发现了我的问题:另一个实例使用相同的应用程序id运行。

由于我为全局状态存储添加了一个新的源节点,所以拓扑结构有所不同,这就是它无法启动的原因。

最新更新