我有一个简单的流应用程序将一个主题作为输入流,并将键值转换为另一个主题,如下所示:
StoreBuilder<KeyValueStore<Long, CategoryDto>> builder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(CategoryTransformer.STORE_NAME),
Serdes.Long(), CATEGORY_JSON_SERDE);
streamsBuilder.addStateStore(builder)
.stream(categoryTopic, Consumed.with(Serdes.Long(), CATEGORY_JSON_SERDE))
.transform(CategoryTransformer::new, CategoryTransformer.STORE_NAME);
static class CategoryTransformer implements Transformer<Long, CategoryDto, KeyValue<Long, CategoryDto>> {
static final String STORE_NAME = "test-store";
private KeyValueStore<Long, CategoryDto> store;
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<Long, CategoryDto>) context.getStateStore(STORE_NAME);
}
@Override
public KeyValue<Long, CategoryDto> transform(Long key, CategoryDto value) {
store.put(key, value);
return KeyValue.pair(key, value);
}
@Override
public KeyValue<Long, CategoryDto> punctuate(long timestamp) {
return null;
}
@Override
public void close() {
}
}
在这里,我必须使用变压器,因为我需要获取存储并更新相关值。
问题是使用本地状态存储与仅将值放入ForeachAction
中的简单HashMap
之间有什么区别?
在这种情况下,使用本地国有商店有什么好处?
虽然它没有显示在你的代码中,但我假设你以某种方式读取和使用存储的状态。
使用简单的(在内存中)存储您的状态HashMap
使您的状态根本不持久,这意味着当发生以下任一情况时,您的状态将丢失(这些都没有什么不寻常的,假设它会经常发生):
- 您的流处理器/应用程序停止,
- 崩溃,或
- 由于重新平衡,部分迁移到其他地方(其他 JVM)。
非持久状态的问题在于,当发生上述任何一种情况时,kafka-streams 将在上次提交的偏移量处重新启动处理。因此,在崩溃/停止/重新平衡之前处理的所有记录都不会重新处理,这意味着当处理重新启动时,HashMap
的内容将为空。这当然不是你想要的。
另一方面,如果您使用提供的状态存储之一,kafka-streams 将确保,一旦处理在上面列出的任何中断后重新启动,状态将可用,就好像处理从未停止一样,而无需重新处理任何以前处理过的记录。
问题是使用本地状态存储和仅将值放入ForeachAction中的简单HashMap之间有什么区别?
如果输入主题未分区,并且运行 Streams 应用程序的单个实例,则本地状态 API 的价值并不大。在这种情况下,当然:您可以在处理器中使用HashMap
,或者如果您想在重新启动后幸存下来,则可以使用一些持久HashMap
。
本地存储的价值在主题分区时变得清晰,在运行 Streams 应用程序的多个实例时更清晰。在这种情况下,您需要与处理特定分区的处理器保持特定状态,并且该状态需要能够随处理器一起移动,以防它移动到不同的 Streams 实例。在这种情况下(又称规模),本地存储设施既必要又无价。想象一下,必须自己大规模地编排,而不是让这个设施成为核心平台(本地状态 API)的一部分。