出于我自己的目的,如果Kafka消费者无法连接到代理,我需要Spring Boot应用程序停止。我的意思是,当Kafka消费者试图池消息时,我们可以看到以下日志:
[Consumer clientId=consumer-ddddd-1, groupId=ddddd] Bootstrap broker localhost:9094 (id: -1 rack: null) disconnected
[Consumer clientId=consumer-ddddd-1, groupId=ddddd] Connection to node -1 (localhost/127.0.0.1:9094) could not be established. Broker may not be available.
当topic或broker不可用时,这是标准行为。因此,应用程序将不会停止。但是我需要。
我试图添加以下属性,但它不工作:
spring.kafka.consumer.fetch-max-wait=1000
spring.kafka.admin.fail-fast=true
spring.kafka.session.timeout.ms=1000
在一般情况下,我想得到这样的行为:如果消费者不能连接-关闭应用程序
- Spring Boot version: 2.3.8.RELEASE
- 卡夫卡:spring-kafka-starter
Kafka轮询示例:
consumer.poll(Duration.ofMinutes(5));
作为一种方法-我们可以从上面的注释中使用。或者我刚刚基于以下代码创建了验证,它可以工作
public void validate() {
try {
consumer.listTopics(Duration.ofSeconds(10));
} catch (TimeoutException e) {
logger.error("Topics doesn't exist OR unavailable broker");
System.exit(1);
}
}
我的解决方案:
@Bean
@Profile("local")
public Boolean localKafkaAvailabilityValidator() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(kafkaProperties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(true);
var future = CompletableFuture.runAsync(kafkaAdmin::clusterId);
try {
future.get(2, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Kafka is not available. Please start it before starting the application.", e);
}
return true;
}