无法自定义 kafkaStreams globalstaterestorelistener



这是我的配置类,但是当我删除状态并重新启动时,它从更改日志主题构建时,我在日志中看不到任何消息。

生成安装程序 kafkastream 的步骤:

  1. 写入 kafka 主题,kstream 在本地读取和存储状态。
  2. 我停止应用程序并从更改主题构建 kafkastream
  3. 它仍然显示默认日志。
[-StreamThread-2] o.a.k.s.p.i.StoreChangelogReade stream-thread [foobar-91eae487-939e-439a-bd5f-c918c1f13145-StreamThread-2] Finished restoring changelog foobar-test-avro-leg-changelog-1 to store test-avro-leg with a total number of 66718 records
@EnableAutoConfiguration
@Slf4j
public class SpringKafkaStreamConfig {
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer(){
return factoryBean -> {
List< StreamsBuilderFactoryBean.Listener
> out = factoryBean.getListeners();
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
java.util.Date start = null;
java.util.Date stop = null;
@Override
public void onRestoreStart(TopicPartition topicPartition,String storeName,long startingOffset,long endingOffset) {
start = Time.from(Instant.now());
log.info("Restarting the building of the following " +
"state store: {} " +
"starting " +
"at offset: {} at the this time: {}",
storeName,
startingOffset,Time.from(Instant.now()));
}
@Override
public void onBatchRestored(TopicPartition topicPartition,String storeName,long batchEndOffset,long numRestored) {
}
@Override
public void onRestoreEnd(TopicPartition topicPartition,
String storeName,long totalRestored) {
stop = Time.from(Instant.now());
log.info("State has completed building at this " +
"time: {} and restored for the " +
"following records: {}",
stop,totalRestored);
}
});
}
});
};
}
}

StreamsBuilderFactoryBeanCustomizer是一个 Boot 类,用于自定义 Spring Boot 配置的单个SBFB

绑定器不使用该工厂 Bean,因为每个绑定都需要一个新的 bean。

Spring Cloud Stream 改用 spring-kafka 提供的StreamsBuilderFactoryBeanConfigurer(如果应用程序上下文中恰好有一个)。

相关内容

  • 没有找到相关文章

最新更新