风暴消息失败



最近我遇到了一个非常奇怪的问题。风暴群集有 3 台计算机。拓扑结构是这样的,卡夫卡喷口 A -> 螺栓 B -> 螺栓 C。我已经删除了每个 bolt 中的所有元组,即使内部 bolt 可能会抛出异常(在 bolt 执行方法中,我尝试捕获所有异常,最后确认元组)。 但在这里发生了奇怪的事情。我打印了壶嘴的日志,在一台机器上,所有被喷口卡住的元组,但在另外两台机器上,几乎所有的元组都失败了。60 秒后,元组一次又一次地重播。"几乎"表示在开始时,其他 2 台计算机上的所有元组都失败了。一段时间后,2 台计算机上有少量元组。

元组绝对会因为超时而失败。但我真的不知道为什么他们超时了。根据我打印的日志,我真的很确定每个 bolt 中执行方法末尾的所有元组都卡住了。所以我想知道为什么一些元组在 2 台机器上失败了。

我能做些什么来找出拓扑或风暴集群的问题吗?真的很感谢,希望得到您的回复。

您的问题与 KafkaSpout 在 StormTopology 中对背压的处理有关。

您可以通过在拓扑配置中设置 maxSpoutPending 值来处理 KafkaSpout 的背压,

Config config = new Config();
config.setMaxSpoutPending(200); 
config.setMessageTimeoutSecs(100);
StormSubmitter.submitTopology("testtopology", config, builder.createTopology());

maxSpoutPending 是在给定时间拓扑中可以挂起确认的元组数。设置此属性将使 KafkaSpout 不再消耗来自 Kafka 的任何数据,除非未确认的元组计数小于 maxSpoutPending 值。

此外,请确保您可以微调 Bolt 以使其尽可能轻量级,以便在元组超时之前得到确认。

最新更新