在 Kafka 流处理中聚合时使用提供的主题进行更改日志和重新分区



我正在使用Kafka流处理来聚合来自源对象的数据。

@Bean
public java.util.function.Consumer<KStream<String, SourceObject>> processSourceObject() {
Serde<SourceObject> SourceObjectSerde = new JsonSerde<>(SourceObject.class);
Serde<AgrregatedObject> AgrregatedObjectSerde = new JsonSerde<>(AgrregatedObject.class);
return input -> input.map((key, value) -> new KeyValue<String, SourceObject>(value.uniques(), value))
.groupByKey(Grouped.with(Serdes.String(), SourceObjectSerde))
.aggregate(AgrregatedObject::new, (uniques, sourceObject,
destinationList) -> new SourceObjectUpdater().apply(sourceObject, destinationList),
Materialized.<String, AgrregatedObject>as(Stores.inMemoryKeyValueStore("custome-snapshots")).withKeySerde(Serdes.String()).withValueSerde(AgrregatedObjectSerde))
.toStream().foreach((foo, bar) -> process);
}

在运行此应用程序时,以及提供的主题来处理SourceObject,它会自动创建另外两个主题

  1. 进程源对象-应用程序 ID-数据-快照-更改日志
  2. 进程源对象-应用程序 ID-数据-快照-重新分区

出于某些原因,我想使用现有主题而不是使用这两个主题。在哪里进行更改以提供预定义主题的名称,以用于我的应用程序的更改日志重新分区数据?

这取决于您使用的版本。从Apache Kafka 2.4开始,Streams API允许命名所有运算符/处理器,这些名称用于重新分区和更改日志主题。

但是,所有内部主题始终以<application.id>-为前缀,并以-repartition-changelog为后缀 - 因此您只能设置部分主题名称。

例如,您可以使用Grouped.as("myName")来设置重新分区主题的名称。

相关内容

  • 没有找到相关文章

最新更新