kafka服务器配置有一个路径,该路径跟随端口号(来自server.properties)
zookeeper.connect=xxxxx007:2181/kafka
java生产者代码:
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "xxxxx007:9092");
如果经纪人省略/kafka ,则生产者填充主题
当代理包含/kafa 时,生产者会得到numberFormatException
Properties props = new Properties();
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "xxxxx007:9092/kafka");
java.lang.NumberFormatException: For input string: "9092/kafka"
如果zookeeper连接包含/kafka ,则java使用者挂起(不返回任何数据)
Properties props = new Properties();
props.put("zookeeper.connect", "xxxxx007:2181/kafka");
如果zookeeper连接省略/kafa-,则java使用者将获得异常
Properties props = new Properties();
props.put("zookeeper.connect", "xxxxx007:2181");
Exception in thread "main" kafka.common.ConsumerRebalanceFailedException: group1_BFTSLBHW0000RGU-1397591737558-f75b6658 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:209)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:92)
at kafka.examples.TitaniumConsumer.main(TitaniumConsumer.java:73)
指定这个zookeeper路径的目的是为了让特定集群的kafka中可用的所有数据都显示在这个特定路径下
请注意,在启动代理之前,您必须自己创建此路径,并且使用者必须使用相同的连接字符串。