我正在使用kafka流的函数式方法.我想通过检查线程状态来获取健康状态



我的问题与下面的问题类似。我想检查通过功能方法编码的Kstream应用程序的健康状况。Spring驱动器+ Kafka流-添加Kafka流状态到健康检查端点

在上面的链接中,答案是根据自动装配Kafka流给出的。我不能自动安装它,因为它给出了以下错误:

字段kafkaStreams.metrics。KafkaStreamsHealthIndicator需要一个类型为org.apache.kafka.streams的bean。KafkaStreams .

我尝试添加以下类作为上面解释的链接,但它给出了Kafka流自动装配的错误

`import org.apache.kafka.streams.KafkaStreams;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.stereotype.Component;
//Note that class name prefix before `HealthIndicator` will be camel-cased
//and used as a health component name, `kafkaStreams` here
@Component
@EnableKafkaStreams
public class KafkaStreamsHealthIndicator implements HealthIndicator {
//    StreamsBuilder streamsBuilder = new StreamsBuilder();
//if you have multiple instances, inject as Map<String, KafkaStreams>
//Spring will map KafkaStreams instances by bean names present in context
//so you can provide status details for each stream by name
@Autowired
private KafkaStreams kafkaStreams;

@Override
public Health health() {
KafkaStreams.State kafkaStreamsState = kafkaStreams.state();
// CREATED, RUNNING or REBALANCING
if (kafkaStreamsState == KafkaStreams.State.CREATED || kafkaStreamsState.isRunningOrRebalancing()) {
//set details if you need one
return Health.up().build();
}
// ERROR, NOT_RUNNING, PENDING_SHUTDOWN,
return Health.down().withDetail("state", kafkaStreamsState.name()).build();
}
}`

KafkaStreams不是spring-kafka模块中定义的bean,相反,您应该使用StreamsBuilderFactoryBean,它在KafkaStreams初始化后公开它们,并提供其他spring和kafka相关的生命周期方法。

你也可以钩到kafka流状态监听器,从那里改变健康状态。

相关内容

  • 没有找到相关文章

最新更新