假设我想让一些转换'A'可配置。此转换使用状态存储管理某些状态,并且还需要重新分区,这意味着只有在配置后才会进行重新分区。现在,如果我以以下方式(或任何其他组合(运行应用程序 3 次(也可能是滚动升级(:-
-
转换"A"已禁用
-
转换"A"已启用
-
转换"A"已禁用
鉴于所有 3 次运行都使用相同的 Kafka 代理集群:-
-
如果启用了 EOS,EOS 保证是否存在于所有 3 次运行中?
-
如果未启用EOS,是否有可能导致消息丢失的情况(甚至至少一次都无法提供(?
拓扑代码,以更好地了解我正在尝试做什么:-
KStream<String, Cab> kStream = getStreamsBuilder()
.stream("topic_a", Consumed.with(keySerde, valueSerde))
.transformValues(() -> transformer1)
.transformValues(() -> transformer2, "stateStore_a")
.flatMapValues(events -> events);
mayBeEnrichAgain(kStream, keySerde, valueSerde)
.selectKey((ignored, event) -> event.getAnotherId())
.through(INTERMEDIATE_TOPIC_2, Produced.with(keySerde, valueSerde)) //this repartitioning will always be there
.transformValues(() -> transformer3, "stateStore_b")
.to(txStreamsConfig.getAlertTopic(), Produced.with(keySerde, valueSerde));
private <E extends Cab> KStream<String, E> mayBeEnrichAgain(final KStream<String, E> kStream,
final Serde<String> keySerde,
final Serde<E> valueSerde) {
if(enrichmentEnabled){ //repartitioning is configurable
return kStream.selectKey((ignored, event) -> event.id())
.through(INTERMEDIATE_TOPIC_1, Produced.with(keySerde, valueSerde))
.transformValues(enricher1)
.transformValues(enricher2);
}
else{
return kStream;
}
}
您不能简单地更改拓扑而不可能破坏它。
一般来说,很难说插入贯穿主题是否会首先破坏应用程序。
如果它没有中断,则在删除主题时可能会"丢失"数据,因为某些未处理的数据可能仍位于本主题中,并且在删除主题后,拓扑不会读取这些数据。
通常,如果将应用升级到更改拓扑结构的较新版本,则应干净地重置应用程序或使用新application.id
。