我在开发一个消息路由器的过程中,我来到了处理意外的点,并放置了一些错误处理。
在我的代码中,我能够区分由于错误消息而发生异常的时间,这是我想将消息发送到死信结束点的唯一一种情况。在所有其他情况下,我考虑由基础设施问题引起的异常(例如,数据库/JMS端点变得不可用),在这种情况下,我希望将消息回滚到路由起点。
查看Camel文档,唯一一个支持死信的错误处理程序是DeadLetterChannel
,但问题是这个错误处理程序不被处理。
那么有没有一种方法可以实现我想要的,有多简单或困难?我可以看到你可以配置骆驼上下文使用自定义错误处理程序构建器,并考虑尝试在我自己的构建器中做TransactionErrorHandler
和DeadLetterChannel
之间的组合,但我不确定这是否是要走的路。开箱即用的构建器似乎有相当复杂的逻辑在那里。
另一种选择是扩展TransactionErrorHandlerBuilder
,并从DeadLetterChannelBuilder
中引入验证终点并创建故障处理器的代码,但同样不确定。如果有那么简单,Camel的人就会把它包含在框架中。我的用例必须是处理企业关键应用程序的任何人的用例。
提前感谢您的建议。如有任何提示,不胜感激。
我试图扩展TransactionErrorHandlerBuilder
如上所述,它没有工作,因为它创建的失败处理器从未使用过。
因为我真的需要这个功能,我试着让我的代码添加一个**"dead。Letter =true",并将原始消息放回交换中,如下所示:
@Override
public void process(Exchange exchange) {
Message incomingMessage = exchange.getIn();
try {
// Do some work here
} catch (MyCustomException e) {
incomingMessage.setHeader("dead.letter", "true");
exchange.setIn(incomingMessage);
} catch (Exception e) {
exchange.setException(e);
}
}
然后在我的路由定义中添加:
from(ep).routeId(createRouteId(system))
.autoStartup(false).transacted()
.threads(route.getThreads())
.filter(body().isNotNull())
.process((Processor) routeBean)
.choice()
.when(header("dead.letter").isNotNull())
.to(mq1:ERROR.QUEUE);
我以为那会解决我的问题。然而我的错误。QUEUE没有得到任何东西(日志中也没有任何东西),事务提交,我的消息丢失了。
请帮助我,因为我已经没有主意了。
我遇到了同样的问题。我不知道为什么TransactionErrorHandlerBuilder不能配置为发送消息到死信通道。我对它的方法setDeadLetterUri()
感到困惑,因为它不像我预期的那样工作。经过长时间的研究,我找到了一个相当简单的解决办法。
它在事务中处理消息,如果出现异常,事务将回滚,原始消息将被发送到死信通道。请注意,您应该明确允许使用原始消息。
@Override
public void configure() {
getContext().setAllowUseOriginalMessage(true);
from("jms:queue:{{foo.bar}}")
.doTry()
.to("direct:processMsgInTransaction")
.doCatch(Exception.class)
.process(exchange -> exchange.setMessage(exchange.getUnitOfWork().getOriginalInMessage()))
.to("jms:queue:{{foo.bar.dead.letter}}")
.endDoTry();
from("direct:processMsgInTransaction")
.errorHandler(new TransactionErrorHandlerBuilder()
.onExceptionOccurred(exchange -> log.error("Something went wrong")))
//.transacted() //uncomment for using default TransactionErrorHandler
.process(someProcessor)
.process(someOtherProcessor);
}