Spring Boot:Kafka健康指标



我有下面这样的东西,效果很好,但我更喜欢在不发送任何消息的情况下检查运行状况(不仅仅是检查套接字连接(。我知道Kafka有一些类似KafkaHealthIndicator的开箱即用的东西,有人有使用它的经验或例子吗?

public class KafkaHealthIndicator implements HealthIndicator {
private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);
private KafkaTemplate<String, String> kafka;
public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
this.kafka = kafka;
}
@Override
public Health health() {
try {
kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return Health.down(e).build();
}
return Health.up().build();
}
}

为了触发健康指示器,请从其中一个未来对象中检索数据,否则即使Kafka关闭,指示器也是UP!!!

当Kafka未连接时,future.get((抛出一个异常,该异常反过来设置该指示符down

@Configuration
public class KafkaConfig {
@Autowired
private KafkaAdmin kafkaAdmin;
@Bean
public AdminClient kafkaAdminClient() {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
@Bean
public HealthIndicator kafkaHealthIndicator(AdminClient kafkaAdminClient) {
final DescribeClusterOptions options = new DescribeClusterOptions()
.timeoutMs(1000);
return new AbstractHealthIndicator() {
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
DescribeClusterResult clusterDescription = kafkaAdminClient.describeCluster(options);
// In order to trip health indicator DOWN retrieve data from one of
// future objects otherwise indicator is UP even when Kafka is down!!!
// When Kafka is not connected future.get() throws an exception which 
// in turn sets the indicator DOWN.
clusterDescription.clusterId().get();
// or clusterDescription.nodes().get().size()
// or clusterDescription.controller().get();
builder.up().build();
// Alternatively directly use data from future in health detail.
builder.up()
.withDetail("clusterId", clusterDescription.clusterId().get())
.withDetail("nodeCount", clusterDescription.nodes().get().size())
.build();
}
};
}
}

使用AdminClient API通过描述将要交互的集群和/或主题来检查集群的运行状况,并验证这些主题是否具有所需数量的insync副本,例如

Kafka有类似于KafkaHealthIndicator的开箱即用

没有Spring的Kafka集成可能

最新更新