如何并行使用多条消息并检测所有执行何时完成



我想发送多条消息,这些消息将异步遍历同一路由,并能够知道所有处理何时完成。

由于我需要知道每条路线何时终止,我考虑使用使用InOut模式的ProducerTemplate#asyncRequestBody,以便在返回的Future对象上调用get将阻塞,直到路由终止。

到目前为止,每个请求都是异步发送到路由的,并且在所有Future调用get方法上循环将阻塞,直到所有我的路线已经走完了。

问题是,虽然请求是异步发送的,但我希望它们也能并行使用。

例如,假设p是ProducerTemplate,Rn是请求,En为端点——我想要的是:

  ->   R0 -> from(E1).to(E2).to(E3) : done.
 /
P ->   R1 -> from(E1).to(E2).to(E3) : done.
   
  ->   R2 -> from(E1).to(E2).to(E3) : done.
        ^__ Requests consumed in parallel.

经过几项研究,我偶然发现了Competing Consumers,它将执行并行化,增加了更多的消费者。

然而,由于同时有多个执行,这会减慢每条路由的执行速度,从而导致一些ExchangeTimedOutException:

The OUT message was not received within: 20000 millis due reply message with correlationID...

这并不奇怪,因为我正在发送InOut请求。但事实上,我真的不在乎回应,我用它只是为了知道当我的路线终止时。我会使用InOnly(ProducerTemplate#asyncSendBody(,但调用Future#get直到整个任务就完成了。

是否有其他方法可以异步发送请求并检测它们何时全部完成?

请注意,在我的情况下,更改超时不是一个选项。

我的第一直觉是建议使用NotifyBuilder来跟踪处理,更具体地说,使用whenBodiesDone来针对特定的实体。

编辑:

这是一个微不足道的实现,但它确实证明了一点:

@SpringBootApplication
public class DemoApplication {
  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
  }
  @Component
  public static class ParallelProcessingRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
      from("seda:test?concurrentConsumers=5")
          .routeId("parallel")
          .log("Received ${body}, processing")
          .delay(5000)
          .log("Processed ${body}")
          .stop();
      from("timer:testStarter?delay=3000&period=300000")
          .routeId("test timer")
          .process(exchange -> {
            // messages we want to track
            List<Integer> toSend = IntStream.range(0, 5).boxed().collect(toList());
            NotifyBuilder builder = new NotifyBuilder(getContext())
                .fromRoute("parallel")
                .filter(e -> toSend.contains(e.getIn().getBody(Integer.class)))
                .whenDone(toSend.size())
                .create();
            ProducerTemplate template = getContext().createProducerTemplate();
            // messages we do not want to track
            IntStream.range(10, 15)
                .forEach(body -> template.sendBody("seda:test", body)); 
            toSend.forEach(body -> template.sendBody("seda:test", body)); 
            exchange.getIn().setBody(builder.matches(1, TimeUnit.MINUTES));
          })
          .log("Matched? ${body}");
    }
  }
}

这里有一个日志示例:

2016-08-06 11:45:03.861  INFO 27410 --- [1 - seda://test] parallel                                 : Received 10, processing
2016-08-06 11:45:03.861  INFO 27410 --- [5 - seda://test] parallel                                 : Received 11, processing
2016-08-06 11:45:03.864  INFO 27410 --- [2 - seda://test] parallel                                 : Received 12, processing
2016-08-06 11:45:03.865  INFO 27410 --- [4 - seda://test] parallel                                 : Received 13, processing
2016-08-06 11:45:03.866  INFO 27410 --- [3 - seda://test] parallel                                 : Received 14, processing
2016-08-06 11:45:08.867  INFO 27410 --- [1 - seda://test] parallel                                 : Processed 10
2016-08-06 11:45:08.867  INFO 27410 --- [3 - seda://test] parallel                                 : Processed 14
2016-08-06 11:45:08.867  INFO 27410 --- [4 - seda://test] parallel                                 : Processed 13
2016-08-06 11:45:08.868  INFO 27410 --- [2 - seda://test] parallel                                 : Processed 12
2016-08-06 11:45:08.868  INFO 27410 --- [5 - seda://test] parallel                                 : Processed 11
2016-08-06 11:45:08.870  INFO 27410 --- [1 - seda://test] parallel                                 : Received 0, processing
2016-08-06 11:45:08.872  INFO 27410 --- [4 - seda://test] parallel                                 : Received 2, processing
2016-08-06 11:45:08.872  INFO 27410 --- [3 - seda://test] parallel                                 : Received 1, processing
2016-08-06 11:45:08.872  INFO 27410 --- [2 - seda://test] parallel                                 : Received 3, processing
2016-08-06 11:45:08.872  INFO 27410 --- [5 - seda://test] parallel                                 : Received 4, processing
2016-08-06 11:45:13.876  INFO 27410 --- [1 - seda://test] parallel                                 : Processed 0
2016-08-06 11:45:13.876  INFO 27410 --- [3 - seda://test] parallel                                 : Processed 1
2016-08-06 11:45:13.876  INFO 27410 --- [4 - seda://test] parallel                                 : Processed 2
2016-08-06 11:45:13.876  INFO 27410 --- [5 - seda://test] parallel                                 : Processed 4
2016-08-06 11:45:13.876  INFO 27410 --- [2 - seda://test] parallel                                 : Processed 3
2016-08-06 11:45:13.877  INFO 27410 --- [r://testStarter] test timer                               : Matched? true

您会注意到NotifyBuilder是如何在结果匹配后立即返回结果的。

如果您知道正在消费的每批消息中都有X条消息,那么您可以在并行处理结束时使用聚合器。举个例子,每组消息都有自己唯一的头标记,聚合器会拾取它。在所有消息都经过处理并且所有消息都到达聚合器后,您可以将消息聚合为您想要的任何格式并返回它们。

最新更新