我是Kafka的新手。试图实施消费者和生产者类以发送和接收消息。需要为两个类配置bootstrap.servers
,这是经纪人IP和端口列表,由,
分隔。例如,
producerConfig.put("bootstrap.servers",
"172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
由于应用程序将在集群的主节点上运行,因此它应该能够从ZooKeeper
检索经纪人信息,就像Kafka的答案:从Zookeeper获取经纪人主机。
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
List<String> ids = zk.getChildren("/brokers/ids", false);
for (String id : ids) {
String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
System.out.println(id + ": " + brokerInfo);
}
}
但是,这个Brokerinfo的形式是这样: {" JMX_PORT": - 1," TIMESTAMP":" 1428512949385","主机":" 192.168.0.11","版本":1," port":9093}
在同一篇文章中,另一篇文章建议为每个经纪人获取连接字符串的方法,并与逗号一起加入。
for (String id : ids) {
String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
if (broker != null) {
brokerList.add(broker.connectionString());
}
}
如果此Broker
类来自org.apache.kafka.common.requests.UpdateMetadataRequest.Broker
,则没有方法createBroker
和connectionString
。
找到了另一个类似的帖子,使经纪人的列表动态。但是它没有说如何从host
和port
等经纪信息中获取属性。我可能可以为JSON像String一样为JSON编写解析器来提取它们,但我怀疑还有更多的Kafka本地方法可以做到这一点。有什么建议吗?
编辑:我意识到Broker
类来自kafka.cluster.Broker
。仍然没有方法connectionstring()
。
您可以使用zkutils检索集群中的所有经纪信息,如下所示:
ZkUtils zk = ZkUtils.apply("zkHost:2181", 6000, 6000, true);
List<Broker> brokers = JavaConversions.seqAsJavaList(zk.getAllBrokersInCluster());
for (Broker broker : brokers) {
//assuming you do not enable security
System.out.println(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host());
}
zk.close();