Storm集群复制元组



目前我正在做一个项目,我在四个Unix主机上设置了一个Storm集群。

拓扑结构本身如下:

  1. JMS Spout为新消息侦听MQ
  2. JMS Spout解析,然后将结果发送到Esper Bolt
  3. Esper Bolt然后处理事件并将结果发送给JMS Bolt
  4. JMS Bolt然后将消息发布回另一个主题上的MQ
我意识到Storm是一个"至少一次"的框架。然而,如果我接收到5个事件并将它们传递给Esper Bolt进行计数,那么出于某种原因,我将在JMS Bolt中接收到5个计数结果(所有值都相同)。

理想情况下,我想接收一个结果输出,是否有一些方法可以告诉Storm忽略重复的元组?

我认为这与我设置的并行性有关,因为当我只有一个线程时,它会像预期的那样工作:

 TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));

我也见过Trident的"精确一次"语义。然而,我不完全相信这会解决这个问题。

如果您的Esper Bolt在其execute()方法结束时没有显式地ack()每个元组或使用iBasicBolt实现,那么它接收的每个元组最终将在超时后由您的原始JMS Spout重播。

或者,如果您要求螺栓"只处理唯一的消息",请考虑将此处理行为添加到execute()方法中。它可以首先检查本地Guava缓存中的元组值唯一性,然后进行相应的处理。

最新更新