如何在两个 Kafka 流之间使用持久化状态存储



我在尝试通过 Kafka 流实现以下内容时遇到了一些麻烦:

  • 在应用程序启动时,(压缩的(主题alpha加载到键值StateStore映射中
  • Kafka 流从另一个主题消费,使用 (.get( 上面的映射,最后在主题 alpha 中生成一条新记录
  • 结果是内存中的映射应与基础主题对齐,即使流处理器重新启动也是如此。

我的方法如下:

val builder = new StreamsBuilderS()
val store = Stores.keyValueStoreBuilder(
   Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)
builder.addStateStore(store)
val loaderStreamer = new LoaderStreamer(store).startStream()
[...] // I wait a few seconds until the loading is complete and the stream os running
val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!
builder
  .stream("another-topic")(Consumed.`with`(kSerde, vSerde))
  .doMyAggregationsAndgetFromTheMapAbove
  .transform(() => new StoreTransformer[K, V]("store"), "store")
  .to("alpha")(Produced.`with`(kSerde, vSerde))

LoaderStreamer(store)

[...]
val builders = new StreamsBuilderS()
builder.addStateStore(store)
builder
  .table("alpha")(Consumed.`with`(kSerde, vSerde))
builder.build
[...]

StoreTransformer

[...]
override def init(context: ProcessorContext): Unit = {
  this.context = context
  this.store = 
    context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}
override def transform(key: K, value: V): (K, V) = {
  store.put(key, value)
  (key, value)
}
[...]

。但我得到的是:

Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.

在尝试获取存储处理程序时。

关于如何实现这一目标的任何想法?

谢谢!

不能在两个 Kafka Streams 应用程序之间共享状态存储。

根据文档: https://docs.confluent.io/current/streams/faq.html#interactive-queries 上述异常可能有两个原因:

  • 本地 KafkaStreams 实例尚未准备就绪,因此尚无法查询其本地状态存储。

  • 本地 KafkaStreams 实例已准备就绪,但特定的状态存储刚刚迁移到后台的另一个实例。

处理它的最简单方法是等到状态存储可查询:

public static <T> T waitUntilStoreIsQueryable(final String storeName,
                                              final QueryableStoreType<T> queryableStoreType,
                                              final KafkaStreams streams) throws InterruptedException {
  while (true) {
    try {
      return streams.store(storeName, queryableStoreType);
    } catch (InvalidStateStoreException ignored) {
      // store not yet ready for querying
      Thread.sleep(100);
    }
  }
}

完整的示例可以在confluent的github上找到。

相关内容

  • 没有找到相关文章

最新更新