我想发送多条消息,这些消息将异步遍历同一路由,并能够知道所有处理何时完成。
由于我需要知道每条路线何时终止,我考虑使用使用InOut模式的ProducerTemplate#asyncRequestBody
,以便在返回的Future
对象上调用get
将阻塞,直到路由终止。
到目前为止,每个请求都是异步发送到路由的,并且在所有Future调用get方法上循环将阻塞,直到所有我的路线已经走完了。
问题是,虽然请求是异步发送的,但我希望它们也能并行使用。
例如,假设p是ProducerTemplate
,Rn是请求,En为端点——我想要的是:
-> R0 -> from(E1).to(E2).to(E3) : done.
/
P -> R1 -> from(E1).to(E2).to(E3) : done.
-> R2 -> from(E1).to(E2).to(E3) : done.
^__ Requests consumed in parallel.
经过几项研究,我偶然发现了Competing Consumers,它将执行并行化,增加了更多的消费者。
然而,由于同时有多个执行,这会减慢每条路由的执行速度,从而导致一些ExchangeTimedOutException
:
The OUT message was not received within: 20000 millis due reply message with correlationID...
这并不奇怪,因为我正在发送InOut请求。但事实上,我真的不在乎回应,我用它只是为了知道当我的路线终止时。我会使用InOnly(ProducerTemplate#asyncSendBody
(,但调用Future#get
直到整个任务就完成了。
是否有其他方法可以异步发送请求并检测它们何时全部完成?
请注意,在我的情况下,更改超时不是一个选项。
我的第一直觉是建议使用NotifyBuilder来跟踪处理,更具体地说,使用whenBodiesDone
来针对特定的实体。
编辑:
这是一个微不足道的实现,但它确实证明了一点:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Component
public static class ParallelProcessingRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("seda:test?concurrentConsumers=5")
.routeId("parallel")
.log("Received ${body}, processing")
.delay(5000)
.log("Processed ${body}")
.stop();
from("timer:testStarter?delay=3000&period=300000")
.routeId("test timer")
.process(exchange -> {
// messages we want to track
List<Integer> toSend = IntStream.range(0, 5).boxed().collect(toList());
NotifyBuilder builder = new NotifyBuilder(getContext())
.fromRoute("parallel")
.filter(e -> toSend.contains(e.getIn().getBody(Integer.class)))
.whenDone(toSend.size())
.create();
ProducerTemplate template = getContext().createProducerTemplate();
// messages we do not want to track
IntStream.range(10, 15)
.forEach(body -> template.sendBody("seda:test", body));
toSend.forEach(body -> template.sendBody("seda:test", body));
exchange.getIn().setBody(builder.matches(1, TimeUnit.MINUTES));
})
.log("Matched? ${body}");
}
}
}
这里有一个日志示例:
2016-08-06 11:45:03.861 INFO 27410 --- [1 - seda://test] parallel : Received 10, processing
2016-08-06 11:45:03.861 INFO 27410 --- [5 - seda://test] parallel : Received 11, processing
2016-08-06 11:45:03.864 INFO 27410 --- [2 - seda://test] parallel : Received 12, processing
2016-08-06 11:45:03.865 INFO 27410 --- [4 - seda://test] parallel : Received 13, processing
2016-08-06 11:45:03.866 INFO 27410 --- [3 - seda://test] parallel : Received 14, processing
2016-08-06 11:45:08.867 INFO 27410 --- [1 - seda://test] parallel : Processed 10
2016-08-06 11:45:08.867 INFO 27410 --- [3 - seda://test] parallel : Processed 14
2016-08-06 11:45:08.867 INFO 27410 --- [4 - seda://test] parallel : Processed 13
2016-08-06 11:45:08.868 INFO 27410 --- [2 - seda://test] parallel : Processed 12
2016-08-06 11:45:08.868 INFO 27410 --- [5 - seda://test] parallel : Processed 11
2016-08-06 11:45:08.870 INFO 27410 --- [1 - seda://test] parallel : Received 0, processing
2016-08-06 11:45:08.872 INFO 27410 --- [4 - seda://test] parallel : Received 2, processing
2016-08-06 11:45:08.872 INFO 27410 --- [3 - seda://test] parallel : Received 1, processing
2016-08-06 11:45:08.872 INFO 27410 --- [2 - seda://test] parallel : Received 3, processing
2016-08-06 11:45:08.872 INFO 27410 --- [5 - seda://test] parallel : Received 4, processing
2016-08-06 11:45:13.876 INFO 27410 --- [1 - seda://test] parallel : Processed 0
2016-08-06 11:45:13.876 INFO 27410 --- [3 - seda://test] parallel : Processed 1
2016-08-06 11:45:13.876 INFO 27410 --- [4 - seda://test] parallel : Processed 2
2016-08-06 11:45:13.876 INFO 27410 --- [5 - seda://test] parallel : Processed 4
2016-08-06 11:45:13.876 INFO 27410 --- [2 - seda://test] parallel : Processed 3
2016-08-06 11:45:13.877 INFO 27410 --- [r://testStarter] test timer : Matched? true
您会注意到NotifyBuilder
是如何在结果匹配后立即返回结果的。
如果您知道正在消费的每批消息中都有X条消息,那么您可以在并行处理结束时使用聚合器。举个例子,每组消息都有自己唯一的头标记,聚合器会拾取它。在所有消息都经过处理并且所有消息都到达聚合器后,您可以将消息聚合为您想要的任何格式并返回它们。