Camel拆分然后聚合异常传播



我的项目需要先拆分后聚合操作,而不是集成的拆分+聚合操作,但我不知道如何传播异常以停止进一步处理。

在下面的示例中,如果在聚合后抛出异常,我需要能够不生成最后一个日志。

@RunWith(SpringJUnit4ClassRunner.class)
public class AggregationExceptionTest extends CamelTestSupport {
private final Logger LOGGER = LoggerFactory.getLogger(AggregationExceptionTest.class);
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")    
.split(body()).streaming().stopOnException().parallelProcessing()
.aggregate(new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange.getException() != null) {
LOGGER.info("exception propagated");
}
return oldExchange==null?newExchange:oldExchange;
}
}).constant(true)
.completionSize(1).completionTimeout(500)
.log(LoggingLevel.INFO, LOGGER, "Aggreg ${body}")
.throwException(Exception.class, "propagate plz")
.end()
.end()
.process(e -> {
LOGGER.info("I don't want to be seen, because of {}", e.getException());
});
}
};
}

@Test
public void test1() throws InterruptedException {
template.sendBody("direct:start", Arrays.asList("A", "B", "C", "D"));

Thread.sleep(5000);
}
}

抛出的异常在聚合方法中永远不可见。

我不确定这是否可能。聚合器输出与调用它的路由完全分离。它也在自己的线程中运行,除非您为它提供SynchronousExecutorService实例。

您可以尝试SynchronousExecutorService选项:

.aggregate(constant(1), new YourAggregator())
.executorService(new SynchronousExecutorService())

在拆分之前,在标头中设置一个Map实例,该实例具有一个名为exception的键,初始值为null。然后在聚合器输出中,如果发生异常,请使用该异常更新Map键。然后,在您的拆分中,您可以检查Exception实例的Map键,并执行任何您想要的操作,例如,拆分器的stopOnException选项。

最新更新