Kafka Stream StateStore无限循环



我们有一个KStream应用程序,它使用内存中的KV StateStore,但禁用了更改日志。

String stateStoreName = "statestore-v1";
StoreBuilder<KeyValueStore<String, Event>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(stateStoreName), 
Serdes.String(), new JsonSerde<>(Event.class));
keyValueStoreBuilder.withLoggingDisabled();
streamsBuilder.addStateStore(keyValueStoreBuilder);

我们现在想要启用具有不同配置和不同名称的变更日志。

String stateStoreName = "statestore-v2";
StoreBuilder<KeyValueStore<String, Event>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(stateStoreName), 
Serdes.String(), new JsonSerde<>(Event.class));
Map<String, String> changelogConfig = new HashMap<>();
changelogConfig.put("retention.ms", "43200000"); // 12 hours
changelogConfig.put("cleanup.policy", "delete");
changelogConfig.put("auto.offset.reset", "latest");
keyValueStoreBuilder.withLoggingEnabled(changelogConfig);
streamsBuilder.addStateStore(keyValueStoreBuilder);

当我们运行应用程序时,我们进入了带有以下消息的无限循环:

2022-10-11 13:02:32.761 app=myapp INFO 54561 --- [-StreamThread-3]
o.a.k.s.p.i.StoreChangelogReader : stream-thread [myapp-StreamThread-3] 
End offset for changelog myapp-statestore-v2-changelog-4 cannot be found; 
will retry in the next time.
2022-10-11 13:02:32.761 app=myapp INFO 54561 --- [-StreamThread-3] 
o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=myapp-StreamThread-3-restore-consumer, groupId=null] 
Unsubscribed all topics or patterns and assigned partitions

变更日志主题似乎从未创建过。。。至少kafka-topics没有显示它。

我使用的是io.confluent包版本7.2.2-cs,我认为它可以翻译成Apache Kafka版本3.2.x

关于如何修复无限循环并创建变更日志主题,有什么想法吗?

谢谢!

无限循环是因为我们正在进行蓝/绿部署。我们了解到,如果使用StateStore更改任何内容(配置或禁用/重新启用变更日志(,我们就无法做到这一点。

我们只是完全关闭了旧版本,然后部署了新版本。效果很好。

另一种选择是使用OneKricketeer建议的kafka-streams-application-reset工具。

最新更新