如何首次使用 apache kafka 集成部署风暴核心拓扑



我想获得有关 Apache Storm 和 kafka 设置初始设置的帮助。

我能够将拓扑提交到风暴集群,但在风暴 ui 中出现低于错误。

Unable to get offset lags for kafka. Reason: java.lang.IllegalArgumentException: zk-node '/kafka-cluster-1/brokers/topics/myfirsttopic/aadb3eb4-2224-4c18-b8ad-6959a1c9f607' dose not exists. at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOldConsumerOffsetsFromZk(KafkaOffsetLagUtil.java:387) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:268) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)

我的代码片段如下。

// Kafka consumer client depends on Zookeeper when finding kafka nodes.
// Zookeeper Host List
String zkConnString = "localhost:2181";
String brokerZkPath = "/kafka-cluster-1/brokers";
String zkRoot       = "/kafka-cluster-1/brokers/topics";
String topicName    = "myfirsttopic";
/* ****************************************************************** */
/* Topology configuration variable                                    */
/* ****************************************************************** */
/* the number of tasks that should be assigned to execute this bolt   */
Integer boltParalismHint  = 1;
Integer spoutParalismHint = 1;
/* ****************************************************************** */
/* Build kafka consumer spout                                         */
/* ****************************************************************** */
// Build zookeeper instance
BrokerHosts hosts = new ZkHosts( zkConnString, brokerZkPath );
// Build configuration instance for Spout
SpoutConfig spoutConfig = new SpoutConfig( hosts, topicName, zkRoot + "/" + topicName , UUID.randomUUID().toString() );
spoutConfig.ignoreZkOffsets = true;
// Build Multischeme instance
spoutConfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
// Build Kafka spout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

我已经引用了该文档并将忽略ZkOffsets设置为true

如果要强制喷口忽略任何消费者状态 存储在ZooKeeper中的信息,那么您应该设置参数 KafkaConfig.ignoreZkOffsets to true

然而,从日志中,卡夫卡喷口似乎正在读取动物园管理员的偏移量。

由于它是初始设置,我如何停止来自动物园管理员的风暴读取偏移?

我使用以下版本。

  • 阿帕奇风暴 1.2.1
  • 阿帕奇卡夫卡 kafka_2.12-1.1.0

我没有做任何特别的事情,但在以下情况下,错误似乎没有出现在风暴UI中。

  1. Kafka 中创建主题
  2. 确保 brokerZkPath 存在于 Zookeeper 中(Path to brokers 目录。在我的情况下/kafka-cluster-1/brokers(
  3. 确保 zkRootPath 存在于 Zookeeper (主题目录的路径。在我的情况下/kafka-cluster-1/brokers/topics(
  4. 将拓扑提交到风暴

最新更新