卡夫卡为什么要更改商店名称



我的应用程序有问题。

代码:

KTable<Long, byte[]> table = stream.groupByKey().aggregate(() -> null , (key, oldVal, newVal) -> {
return newVal;
}, Materialized.<Long,byte[],KeyValueStore<Long,byte[]>>as("networkStore").with(longSerde, byteSerde));

这里,我将Store Name设置为networkStore,但当我列出Kafka主题时,Store的名称为network-service-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog

我想要的是:-商店的名称是networkStore,这样我以后可以从中读取。

当我现在尝试从商店阅读时,它会给我以下异常:

org.apache.kafka.streams.errors.InvalidStateStoreException:状态存储networkStore可能已迁移到另一个实例。网址:org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60(网址:org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1039(网址:com.maxflow.networkssevice.utils.NetworksServiceUtils.updateGraphForCompany(NetworksServiceUtils.java:41(网址:com.maxflow.networkssevice.consumer.NodesConsumer.run(NodesConsumer.java:99(在java.lang.Thread.run(Thread.java:748(

使用以下内容:

KTable<Long, byte[]> table = stream.groupByKey().aggregate(() -> null , (key, oldVal, newVal) -> {
return newVal;
}, Materialized.with(longSerde, byteSerde).as("networkStore"));

Materialized.as().with()正在用内部名称覆盖自定义名称。所以你应该在.with()之后调用.as()方法。您可以在此处阅读更多详细信息。

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Materialized.html#with-org.apache.kafka.commun.serialization.Serde

另一个选项是使用.withKeySerde().withValueSerde()方法以及自定义存储名称,如下所示。

Materialized.<Long,byte[],KeyValueStore<Long,byte[]>>as("networkStore").withKeySerde(longSerde).withValueSerde(byteSerde)

相关内容

  • 没有找到相关文章

最新更新