检查系统是否连接到Kafka



我需要在Java中编写烟雾测试,该测试验证了系统是否连接到kafka,

有人有任何想法吗?我找到了这篇文章:

如何检查Kafka服务器是否正在运行?

,但是从Java代码中做太复杂了,我认为这不是我应该使用的方向。

预先感谢。

我也有同样的问题,我不想在没有任何答案的情况下离开这个问题。我读了很多有关如何检查连接的信息,发现的大多数答案都是检查与ZK的连接,但我真的想直接与Kafka服务器检查连接。

我所做的是创建一个简单的 kafkaconsumer ,并使用 listTopics()列出所有主题。如果连接成功,那么您将获得一些回报。否则,您将获得TimeoutException

  def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }

然后,您可以将此方法包裹在try-catch句子中以捕获异常。

编辑:这是针对Kafka的古老的。请勿在2023年使用它:)


您可以通过使用此操作检查服务器是否正在运行:

ZkClient zkClient = new ZkClient("your_zookeeper_server", 5000 /* ZOOKEEPER_SESSION_TIMEOUT */, 5000 /* ZOOKEEPER_CONNECTION_TIMEOUT */, ZKStringSerializer$.MODULE$);
List<Broker> brokers = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
if (brokers.isEmpty()) {
    // No brokers available
} else {
    // There are brokers available
}

相关内容

  • 没有找到相关文章

最新更新