Spring Kafka,识别容器是否为批处理模式


commandProcessors-in-0:
destination: internal-command-processor
consumer:
max-attempts: 1
group: command-processor-group
retrieveCohort-in-0:
destination: internal-retrieve-cohort
consumer:
max-attempts: 1
batch-mode: true
group: retrieve-cohort-group

我输入了不同的类似消费者的配置,其中一个可能将批处理模式设置为true,另一个则不然。

在我的ListenerContainerCustomizer中,我想知道他们是否将批处理模式设置为true,这可能吗。

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer( ){
return (container, dest, group) -> {
if (dest.equals("internal-generate-stop-reason")) {
container.setBatchErrorHandler(new RetryingBatchErrorHandler(new FixedBackOff(5000L, 2L),
new DeadLetterPublishingRecoverer(kafkaTemplate(),
(rec, ex) -> new TopicPartition("error-dlq", rec.partition()))));
} else {
System.out.println(dest+" => "+container.getAssignmentsByClientId());
}
};
}

没有办法知道这一点;除非使用group/dest属性。

然而,对于3.2.x版本(cloud 2021.0.x,Spring for Apache Kafka 2.8.x(,现在有了一个CommonErrorHandler,它可以处理批处理和记录侦听器;CCD_ 3和CCD_。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#eh-摘要

相关内容

  • 没有找到相关文章

最新更新