Spring Integration错误处理,最后使用电子邮件一次



如何通过记录错误并继续下一条记录来处理下面SpringIntegrationFlow中抛出的错误。同样,无论有一条或多条记录失败,都只能发送一次电子邮件。有类似于Spring Batch的StepListener的东西吗?感谢

return IntegrationFlows.from(jdbcMessageSource(), p -> p.poller(pollerSpec()))
.enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, appErrorChannel()))
.split()
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.transform(transformer, "transform")
.enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE))
.handle(Http.outboundGateway(url)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class)
.requestFactory(requestFactory))
.get();
@Bean
public MessageChannel appErrorChannel() {
return new DirectChannel();
}
@Bean("appErrorFlow")
public IntegrationFlow appErrorFlow() {
// @formatter:off
return IntegrationFlows.from(appErrorChannel())
.log(Level.ERROR, message -> " Failed Message " + message.getPayload())
.aggregate(a -> a.correlationStrategy(m -> !m.getHeaders().isEmpty()))
.handle(Http.outboundGateway(mailURL)
.httpMethod(HttpMethod.POST))
.get();
// @formatter:on
}

异常:

2022-01-13 06:41:31,702 [Worker-1                  ] WARN  o.s.i.c.MessagePublishingErrorHandler - 005006f2dee5b673 Error message was not delivered.
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unknown.channel.name'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=ErrorMessage [payload=*******Removed On Purpose*******]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:96)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:60)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:64)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 11 common frames omitted

我认为StepListener最接近的等价物是ChannelInterceptor

请参见IntegrationFlowDefinition.intercept()运算符,将其放置在该流中的端点之间。

然而,根据您的拆分器配置,您似乎不会丢失任何东西,错误只会被记录下来。c.executor()完成了将每个记录的工作转移到线程池中的技巧,其错误不会影响其余记录。

不管怎样,在split()-.enrichHeaders(Collections.singletonMap(MessageHeaders.ERROR_CHANNEL, myErrorChannel()))之前,在头文件中设置一个自定义错误通道听起来更好。

然后,您有一个订阅该频道的聚合器,您可以在其中聚合批处理的错误,并发出发送电子邮件的消息。这种情况每批只会发生一次,如果没有错误,则根本不会发生。

最新更新