风暴号,有没有办法记录飞行中有多少元组



作为调优的一部分,我一直在调整maxSpoutPending参数。然而,如果能随时知道拓扑中有多少元组就好了,这样我就能知道这个参数对拓扑性能的影响有多大。

我在源码里翻了一遍,但什么也没找到。我能在Storm的UI中找到这个值吗?或者我可以重写一些东西来记录这个值?

您说您正在寻找有关maxTuplesPending属性有效性的见解。

使用Storm提供的KafkaSpout,(我修改了源代码,添加了更多的日志记录,以查看发生了什么)next()方法一直被调用(<1ms)。因此,从元组返回或失败(减少MaxPending计数)到将新元组发送到拓扑(再次达到MaxPending计数),我总是看到相对较快的转换(<1ms)。今天的日志显示时间戳,从一个元组被返回,然后另一个元组被发送。

2015-10-16T12:20:15.162-0500 s.k.PartitionManager [INFO] PM! 6 - ack
2015-10-16T12:20:15.163-0500 s.k.PartitionManager [INFO] PM! 177 - next
2015-10-16T12:20:15.400-0500 s.k.PartitionManager [INFO] PM! 10 - ack
2015-10-16T12:20:15.401-0500 s.k.PartitionManager [INFO] PM! 178 - next
2015-10-16T12:20:15.649-0500 s.k.PartitionManager [INFO] PM! 22 - ack
2015-10-16T12:20:15.649-0500 s.k.PartitionManager [INFO] PM! 180 - next
2015-10-16T12:20:16.511-0500 s.k.PartitionManager [INFO] PM! 27 - ack
2015-10-16T12:20:16.512-0500 s.k.PartitionManager [INFO] PM! 182 - next 

这显示了相当瞬时的转换。对于我的用例,我的拓扑中总是有maxPending count元组的个数。

我的元组处理得也不是很快(大约1秒),所以对于处理得快得多的元组或不同类型的Spouts,我不能说

这取决于你所说的"拓扑中有多少个元组"。

  1. 如果你想知道喷口发出的元组有多少还没有被完全处理,你可以简单地从Storm UI中取"喷口发出"one_answers"喷口打包"的差值(你也可以通过client.getTopologyInfo("topolgoyName")(与client = NimbusClient.getConfiguredClient(...))获得这些值)。
  2. 如果你想知道拓扑中所有阶段的所有元组(即,在每个喷口/螺栓的所有缓冲区中),它可能相当棘手…TopologyInfo可能仍然有帮助,但我不确定是否/如何计算你想知道的值。

假设您的spout中有足够的消息,您可以强制spout从一开始就读取,看看您在10分钟内可以处理多少元组。(通过基本的数学计算,您可以获得每秒元组的数量)。

例如,对于kafka喷口,你可以这样做:

        SpoutConfig spoutConfig = new SpoutConfig(
          // your spout config
         );   
    spoutConfig.forceFromStart = true; // this is how you tell the spout to read from the oldest kafka offset
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

然后让拓扑运行15分钟,看看拓扑在过去10分钟内处理了多少元组。

最新更新