storm.kafka.UpdateOffsetException-不透明三叉戟kafka Spout问题



我使用OpaqueTridentKafkaSpout的三叉拓扑。

我正在使用的TridentKafkaConfig的代码片段:-

OpaqueTridentKafkaSpout kafkaSpout = null;
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(new ZkHosts("xxx.x.x.9:2181,xxx.x.x.1:2181,xxx.x.x.2:2181"), "topic_name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.fetchSizeBytes = 147483600;
kafkaSpout = new OpaqueTridentKafkaSpout(spoutConfig);

我从其中一个工人那里得到了这个运行时异常:-

java.lang.RuntimeException:处的storm.kafka.UpdateOffsetExceptionbacktype.storm.utils.DisruptorQueue.consumerBatchToCursor(Disruptor Queue.java:135)在backtype.storm.utils.DisruptorQueue.consumerBatchWhenAvailable(Disruptor Queue.java:106)在backtype.storm.disruptor$consume_batch_when_available.ioke(disruptor.clj:80)在backtype.storm.daemon.executor$fn_5694$fn5707$fn5758.invoke(executor.clj:819)位于的backtype.storm.util$async_loop$fn545.uinvoke(util.clj:479)clojure.lang.AFn.run(AFn.java:22)java.lang.Thread.run(Thread.java:745)由以下原因引起:位于的storm.kafka.UpdateOffsetExceptionstorm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186)storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafka Emitter.java:132)在storm.kafka.treedent.TreedentKafkaEmitter.doEmitNewPartitionBatch(TridentKafka Emitter.java:113)在storm.kafka.treedent.TreedentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafka Emitter.java:72)在storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)在storm.kafka.trident.TridentKafkaEmitter.access$000(TridentKafka Emitter.java:46)在storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafka Emitter.java:204)在storm.kafka.trident.TridentKafkaEmitter$1.emitPartitionBatch(TridentKafka Emitter.java:194)在storm.trident.poutput.OpaquePartitionedTridentSpoutExecutiator$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)在storm.trident.putter.TridentSpoutExecutor.execute(TridentSpooutExecutor.java:82)在storm.trident.topology.TridentBoltExecutor.execute(TridentBoltTexecutor.java:370)在backtype.storm.daemon.executor$fn5694$tuple_action_fn5696.invoke(executor.clj:690)在backtype.storm.daemon.executor$mk_task_receiver$fn5615.invoke(executor.clj:436)在backtype.storm.disruptor$clojure_handler$reify_5189.onEvent(disruptor.clj:58)在backtype.storm.utils.DisruptorQueue.consumerBatchToCursor(Disruptor Queue.java:127)…还有6个

根据一些帖子,我已经尝试过设置spootConfig:-poutConfig.maxOffsetBehind=长.MAX_VALUE;poutConfig.startOffsetTime=kafka.api.FoffsetRequest.ErliestTime();我的Kafka保留时间默认为128小时,即7天,Kafka生产者每秒向Storm/Trident拓扑发送6800条消息。我浏览了大部分帖子,但似乎没有一个能解决这个问题。处理这个问题的最佳方法是什么?

我仍然不知道是什么导致了这个问题。但基本上我们并没有关闭风暴,动物园管理员和卡夫卡。这导致风暴拓扑失败,我们不得不拆除整个集群并重新构建它。更新到风暴0.10.0有助于解决其他一些问题。

最新更新