Spring Integration Transform回滚JMS失败,未转发到错误通道



使用Boot 2.2.2和Integration 5.2.2-当XML消息来源于File并且解组失败(即它不是XML(时,消息按预期继续发送到errorChannel。然而,当消息来自JMS,通过相同的通道路由并且解组失败时,它被而不是路由到errorChannel,并且消息被回滚到JMS。在那之后,我被困在SAXParseException的无休止循环中。

我从迁移JMS事件的正确最终方式中学习了这个例子,监听Spring Integration with Spring Boot.是否存在我没有考虑的隐含交易控制?如何让Spring Integration将消息转发到errorChannel并提交来自传入队列的"get"?

代码概要如下;

@Bean
public IntegrationFlow fileReader() {
return IntegrationFlows
.from(
Files
.inboundAdapter( ... )
...
.get(), e -> e.poller(Pollers.fixedDelay(1000))
)
.transform(new FileToStringTransformer())
.channel("backUpChannel")
.get();
}
@Bean
public IntegrationFlow getMessageFromJms(ConnectionFactory connectionFactory, @Value("${queues.myQueue}") String myQueue) {
return IntegrationFlows.from(
Jms
.messageDrivenChannelAdapter(connectionFactory)
.destination(myQueue)
)
.channel("backUpChannel")
.get();
}
@Bean
public IntegrationFlow doBackUp() {
return IntegrationFlows
.from("backUpChannel")
.<String>handle((payload, headers) -> {
String uuid = headers.get(MessageHeaders.ID).toString();
File backUpFile = new File("c:/backup/" + uuid + ".txt");
byte[] payloadContent = payload.getBytes();
try {
java.nio.file.Files.write(backUpFile.toPath(), payloadContent);
} catch (IOException e) {
e.printStackTrace();
}
return payload;
})
.channel("XXX")
.get();
}
@Bean
public Jaxb2Marshaller unmarshaller() {
Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
unmarshaller.setClassesToBeBound(MyClass.class);
return unmarshaller;
}
@Bean
public IntegrationFlow handleParseXml() {
return IntegrationFlows
.from("XXX")
.transform(new UnmarshallingTransformer(unmarshaller()))
.channel("YYY")
.get();
}

您需要将.errorChannel(...)添加到消息驱动的通道适配器中。

最新更新