我已经实现了一个从Kafka队列读取消息的heron拓扑。因此,我的拓扑结构有一个kafka喷口和一个计算从队列中读取的消息数量的螺栓。
当我向kafka队列发送say10000
消息时,我可以看到在heron拓扑中的kafka喷口中接收到的所有消息,但很少有消息在螺栓处丢失。
以下是苍鹭的拓扑设置
Config config = Config.newBuilder()
.setUserConfig("topology.max.spout.pending", 100000)
.setUserConfig("topology.message.timeout.secs", 100000)
.setNumContainers(1)
.setPerContainerCpu(3)
.setPerContainerRamInGigabytes(4)
.setDeliverySemantics("ATLEAST_ONCE")
.build();
任何指示都会有所帮助。
编辑:我使用的是苍鹭的streamlet API。我用log
螺栓替换了计数螺栓,但在log
螺栓的日志中看到了相同的消息丢失问题
processingGraphBuilder.newSource(kafkaSource)
.log();
第2版:我通过完全删除streamlet API解决了这个问题。我重新实现了一切使用基本的喷口和螺栓API,并在喷口有acking。这解决了问题。我想这是因为在流API 的喷口没有发生acking
简单的答案:不应该丢弃。
几个问题:-在赫罗努伊,你的喷口的所有时间发射和ack计数是多少?-在herenui中,你的螺栓的所有执行次数、ack次数和失败次数是多少?
当您说消息被丢弃时,您看到的是在失败计数度量中注册的失败,还是只是您在螺栓中的执行计数与喷口的发射计数不一致?
在Storm兼容模式下,指标是基于样本计算的(我认为默认值为5%(。因此,计数可能会以这样的差距出局。例如,根据流的采样时间,可以发送100个元组,执行计数可以是80或120。