我想获得有关 Apache Storm 和 kafka 设置初始设置的帮助。
我能够将拓扑提交到风暴集群,但在风暴 ui 中出现低于错误。
Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)
我的代码片段如下。
// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot = "/kafka-cluster-1/brokers/topics";
String topicName = "myfirsttopic";
/* ****************************************************************** */
/* Topology configuration variable */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt */
Integer boltParalismHint = 1;
Integer spoutParalismHint = 1;
/* ****************************************************************** */
/* Build kafka consumer spout */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );
// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );
spoutConfig.ignoreZkOffsets = true;
// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
我已经引用了该文档并将忽略ZkOffsets设置为true。
如果要强制喷口忽略任何消费者状态 存储在ZooKeeper中的信息,那么您应该设置参数 KafkaConfig.ignoreZkOffsets to true
然而,从日志中,卡夫卡喷口似乎正在读取动物园管理员的偏移量。
由于它是初始设置,我如何停止来自动物园管理员的风暴读取偏移?
我使用以下版本。
- 阿帕奇风暴 1.2.1
- 阿帕奇卡夫卡 kafka_2.12-1.1.0
我没有做任何特别的事情,但在以下情况下,错误似乎没有出现在风暴UI中。
- 在 Kafka 中创建主题
- 确保 brokerZkPath 存在于 Zookeeper 中(Path to brokers 目录。在我的情况下/kafka-cluster-1/brokers(
- 确保 zkRootPath 存在于 Zookeeper (主题目录的路径。在我的情况下/kafka-cluster-1/brokers/topics(
- 将拓扑提交到风暴