我的代码当前正在使用InMemoryKeyValueStore,它避免了对磁盘或kafka的任何持久性。我想使用 rocksdb (Stores.persistentKeyValueStore(,以便应用程序将从磁盘重新加载状态。我正在尝试实现这一点,我对 Kafka 和流 API 非常陌生。希望得到帮助,我可以做出改变,同时我仍然试图理解我的东西。
我尝试在这里创建状态存储:
StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> store =
Stores.<String, LinkedList<StoreItem>>keyValueStoreBuilder(Stores.persistentKeyValueStore(storeKey), Serdes.String(), valueSerde);
如何向流生成器注册它?
使用 inMemoryKeyValueStore 的现有代码:
static StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> makeStoreBuilder(
final String storeKey,
final Serde<LinkedList<StoreItem>> valueSerde,
final boolean loggingDisabled) {
final StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeKey), Serdes.String(), valueSerde);
return storeBuilder;
}
我需要确保流应用程序在每次重新启动时都不会丢失日志主题中的现有消息。
如何向流生成器注册它?
通过调用StreamsBuilder#addStateStore()
.
https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addStateStore-org.apache.kafka.streams.state.StoreBuilder-
请参阅端到端演示应用程序 https://github.com/confluentinc/kafka-streams-examples StateStoresInTheDSLIntegrationTest
。
使用持久存储作为内存中存储。商店负责其余的工作,您无需担心加载数据等。你只是使用它。