这是对上一个问题的跟进(原始问题中给出的要求(。
弹簧集成 - 过滤器 - 将消息发送到不同的端点
我的问题是,如果输入文件中有多个错误,则只会记录第一个错误。不会记录后续错误。
修改后的代码:
@Configuration
public class CreateUserConfiguration {
@Bean
public IntegrationFlow createUser() {
return IntegrationFlows.from(Files.inboundAdapter(new File(INPUT_DIR)))
.enrichHeaders(h -> h.header("errorChannel", "exceptionChannel", true))
.transform(csvToUserBeanTransformer, "convertCsvToUserBean")
.split(userBeanSplitter, "splitUserBeans")
.wireTap(flow -> flow.<UserBean>filter(userBean -> !userBean.getStatus().equalsIgnoreCase("SUCCESS")).channel("errorSummaryReportGenerationChannel"))
.transform(userBeanToJSONTransformer, "convertUserBeanToJSON")
.handle(Files.outboundAdapter(new File(OUTPUT_SUCCESS_DIRECTORY)))
.get();
}
@Bean
public IntegrationFlow logErrorSummary() {
return IntegrationFlows.from("errorSummaryReportGenerationChannel")
.handle((p,h) -> {
return ((UserBean)(p)).getUserID() + "t" + ((UserBean)(p)).getStatus();
})
.transform(Transformers.objectToString())
.handle(Files.outboundAdapter(new File(OUTPUT_FAILED_REPORT_FILE_NAME)))
.get();
}
@Bean
public IntegrationFlow logError() {
return IntegrationFlows.from("exceptionChannel")
.enrichHeaders(h -> h.headerExpression("errorFileName", "payload.failedMessage.headers.fileName"))
.wireTap(flow -> flow.handle(msg -> System.out.println("Received on exceptionChannel " + msg.getHeaders().get("errorFileName"))))
.transform(Transformers.objectToString())
.handle(Files.outboundAdapter(new File(generateOutputDirectory(OUTPUT_FAILED_DIRECTORY))).autoCreateDirectory(true).fileExistsMode(FileExistsMode.APPEND).fileNameExpression("getHeaders().get("errorFileName")+'.json'"))
.get();
}
@Bean(name = "exceptionChannel")
MessageChannel exceptionChannel() {
return MessageChannels.executor(new SimpleAsyncTaskExecutor()).get();
}
@Bean(name="errorSummaryReportGenerationChannel")
MessageChannel errorSummaryReportGenerationChannel() {
return DirectChannel();
}
}
我的期望:
错误摘要报告 -
B123 ERROR, FREQUENCY
C123 FREQUENCY_DETAIL
在OUTPUT_FAILED_DIRECTORY -
B123.json -> stacktrace of error
C123.json -> stacktrace of error
我所看到的:(C123信息丢失(
错误摘要报告 -
B123 ERROR, FREQUENCY
在OUTPUT_FAILED_DIRECTORY -
B123.json -> stacktrace of error
问题从.split(userBeanSplitter, "splitUserBeans")
弹出。
我想说的是,它与我们在普通Java中所做的完全相似,具有for
循环。 因此,如果循环中的方法引发异常,则您确实离开了循环,并且将不再处理下一项。
要解决您的问题,您需要添加一个.channel(c -> c.executor(myExecutor()))
来并行处理拆分的项目,并在单独的线程中进行错误处理。这样,分路器中的环路就不会受到影响。