作为调优的一部分,我一直在调整maxSpoutPending
参数。然而,如果能随时知道拓扑中有多少元组就好了,这样我就能知道这个参数对拓扑性能的影响有多大。
我在源码里翻了一遍,但什么也没找到。我能在Storm的UI中找到这个值吗?或者我可以重写一些东西来记录这个值?
您说您正在寻找有关maxTuplesPending属性有效性的见解。
使用Storm提供的KafkaSpout,(我修改了源代码,添加了更多的日志记录,以查看发生了什么)next()方法一直被调用(<1ms)。因此,从元组返回或失败(减少MaxPending计数)到将新元组发送到拓扑(再次达到MaxPending计数),我总是看到相对较快的转换(<1ms)。今天的日志显示时间戳,从一个元组被返回,然后另一个元组被发送。
2015-10-16T12:20:15.162-0500 s.k.PartitionManager [INFO] PM! 6 - ack
2015-10-16T12:20:15.163-0500 s.k.PartitionManager [INFO] PM! 177 - next
2015-10-16T12:20:15.400-0500 s.k.PartitionManager [INFO] PM! 10 - ack
2015-10-16T12:20:15.401-0500 s.k.PartitionManager [INFO] PM! 178 - next
2015-10-16T12:20:15.649-0500 s.k.PartitionManager [INFO] PM! 22 - ack
2015-10-16T12:20:15.649-0500 s.k.PartitionManager [INFO] PM! 180 - next
2015-10-16T12:20:16.511-0500 s.k.PartitionManager [INFO] PM! 27 - ack
2015-10-16T12:20:16.512-0500 s.k.PartitionManager [INFO] PM! 182 - next
这显示了相当瞬时的转换。对于我的用例,我的拓扑中总是有maxPending count元组的个数。
我的元组处理得也不是很快(大约1秒),所以对于处理得快得多的元组或不同类型的Spouts,我不能说
这取决于你所说的"拓扑中有多少个元组"。
- 如果你想知道喷口发出的元组有多少还没有被完全处理,你可以简单地从Storm UI中取"喷口发出"one_answers"喷口打包"的差值(你也可以通过
client.getTopologyInfo("topolgoyName")
(与client = NimbusClient.getConfiguredClient(...)
)获得这些值)。 - 如果你想知道拓扑中所有阶段的所有元组(即,在每个喷口/螺栓的所有缓冲区中),它可能相当棘手…
TopologyInfo
可能仍然有帮助,但我不确定是否/如何计算你想知道的值。
假设您的spout中有足够的消息,您可以强制spout从一开始就读取,看看您在10分钟内可以处理多少元组。(通过基本的数学计算,您可以获得每秒元组的数量)。
例如,对于kafka喷口,你可以这样做:
SpoutConfig spoutConfig = new SpoutConfig(
// your spout config
);
spoutConfig.forceFromStart = true; // this is how you tell the spout to read from the oldest kafka offset
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
然后让拓扑运行15分钟,看看拓扑在过去10分钟内处理了多少元组。