我在尝试通过 Kafka 流实现以下内容时遇到了一些麻烦:
- 在应用程序启动时,(压缩的(主题
alpha
加载到键值StateStore
映射中 - Kafka 流从另一个主题消费,使用 (.get( 上面的映射,最后在主题
alpha
中生成一条新记录 - 结果是内存中的映射应与基础主题对齐,即使流处理器重新启动也是如此。
我的方法如下:
val builder = new StreamsBuilderS()
val store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)
builder.addStateStore(store)
val loaderStreamer = new LoaderStreamer(store).startStream()
[...] // I wait a few seconds until the loading is complete and the stream os running
val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!
builder
.stream("another-topic")(Consumed.`with`(kSerde, vSerde))
.doMyAggregationsAndgetFromTheMapAbove
.transform(() => new StoreTransformer[K, V]("store"), "store")
.to("alpha")(Produced.`with`(kSerde, vSerde))
LoaderStreamer(store)
:
[...]
val builders = new StreamsBuilderS()
builder.addStateStore(store)
builder
.table("alpha")(Consumed.`with`(kSerde, vSerde))
builder.build
[...]
StoreTransformer
:
[...]
override def init(context: ProcessorContext): Unit = {
this.context = context
this.store =
context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}
override def transform(key: K, value: V): (K, V) = {
store.put(key, value)
(key, value)
}
[...]
。但我得到的是:
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.
在尝试获取存储处理程序时。
关于如何实现这一目标的任何想法?
谢谢!
不能在两个 Kafka Streams 应用程序之间共享状态存储。
根据文档: https://docs.confluent.io/current/streams/faq.html#interactive-queries 上述异常可能有两个原因:
-
本地 KafkaStreams 实例尚未准备就绪,因此尚无法查询其本地状态存储。
-
本地 KafkaStreams 实例已准备就绪,但特定的状态存储刚刚迁移到后台的另一个实例。
处理它的最简单方法是等到状态存储可查询:
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}
完整的示例可以在confluent的github上找到。