我当前的代码使用 InMemoryKeyValueStore,它避免了对磁盘的任何持久性。我想使用 Stores#persistentKeyValueStore。我当前的代码如下:
static StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> makeStoreBuilder(
final String storeKey,
final Serde<LinkedList<StoreItem>> valueSerde,
final boolean loggingDisabled) {
final StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeKey), Serdes.String(), valueSerde);
return storeBuilder;
}
对于我使用Stores#persistentKeyValueStore,我是否只需像这样更改调用:
final StoreBuilder<KeyValueStore<String, LinkedList<StoreItem>>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeKey), Serdes.String(), valueSerde);
我可以知道这是否是改变这种情况的正确方法,以及它是否会产生任何影响吗?我对 Kafka 和 Streams API 非常陌生,希望能得到一些见解。
谢谢。
是的,使用 Stores.persistentKeyValueStore(topicName( 会将状态持久化到磁盘。请确保:
- 您分配了足够的磁盘空间
- 为数据设置适当的保留期
我看到您禁用了日志记录,这是故意的吗?