Spring Integration - Poller 不处理来自 JobLaunchingGateway 的 Mess



在我的 Spring 集成/Spring 批处理应用程序中寻找有关错误处理的建议。

上下文

我正在使用入站文件适配器轮询输入目录中的文件。该文件作为参数通过 JobLaunchingGateway 传递给 Spring Batch 作业。作业使用 TaskExecutor 在其自己的线程池上运行。

当作业完成时,文件将移动到已处理的目录,或在作业执行失败时移动到错误目录。

轮询器使用AcceptOnceFileListFilter由于缓存位于内存中,因此在应用程序重新启动之间不会保留已处理的文件。这对我的应用程序来说不是问题,因为如果相同的文件在以后的时间点出现在输入目录中,Spring Batch 会引发异常。

问题

第 1 天 - 成功处理输入文件abc_success005.csv

几天后(应用程序从第 1 天开始重新启动) - 当同一个文件错误地到达输入目录时,我注意到轮询器线程收到了来自JobLaunchingGateway的错误,并且它没有继续处理当前轮询周期中的后续消息。

请注意,轮询员在轮询间隔到期后按计划恢复职责并继续操作。

SimpleJobLauncher 使用 TaskExecutor 在单独的线程池中执行作业。但是,作业执行的创建使用调用方的线程,在我的例子中是轮询器。

SimpleJobLauncher - 来自 Spring 批处理源代码

jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
try {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
if (logger.isInfoEnabled()) {
logger.info("Job: [" + job + "] launched with the following parameters: [" + jobParameters
+ "]");
}
job.execute(jobExecution);

我的集成流程

@Bean
public IntegrationFlow myIntegrationFlow(JobLaunchingGateway jobLaunchingGateway,
FileMessageToJobRequest fileMessageToJobRequest) {
return IntegrationFlows.from(Files.inboundAdapter(new File(properties.getInputDir()))
.filter(new AcceptOnceFileListFilter<>()),
c -> c.poller(Pollers.fixedRate(300, TimeUnit.SECONDS)
.taskExecutor(taskExecutor())
.maxMessagesPerPoll(50)
))
.transform(fileMessageToJobRequest)
.handle(jobLaunchingGateway)
.log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload")
.get();
}

堆栈跟踪

[ERROR] 2022-08-15 11:59:46,853 o.s.i.h.LoggingHandler error taskExecutor-2id taskExecutor-2 - org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={input_file_name=/var/tmp/batch/input/abc_success005.csv}.  If you want to run this job again, change the parameters., failedMessage=GenericMessage [payload=JobLaunchRequest: processStuffJob, parameters={input_file_name=/var/tmp/batch/input/abc_success005.csv}, headers={file_originalFile=/var/tmp/batch/input/abc_success005.csv, id=298019a3-b548-2a26-1469-78394758d824, file_name=abc_success005.csv, file_relativePath=abc_success005.csv, timestamp=1660579186811}]
at org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:78)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:325)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:268)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:232)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:475)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:461)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={input_file_name=/var/tmp/batch/input/abc_success005.csv}.  If you want to run this job again, change the parameters.
at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:139)

轮询器线程通常获取一堆文件/消息并将其发送到 JobLaunchingGateway 进行处理。当作业无法启动时,轮询器不会处理异常,因此文件其余部分的处理将终止。

我知道这不会阻止轮询器恢复下一个周期,但不希望一个错误文件结束当前的轮询周期。这也可能导致以下情况:一个错误文件(保持原样时)可能会导致问题在接下来的几个轮询周期中重复出现。

表单 Spring 集成源代码 - 抽象轮询端点

private Runnable createPoller() {
return () ->
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (this.maxMessagesPerPoll == 0) {
logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
break;
}
if (pollForMessage() == null) {
break;
}
count++;
}
});
}

在这种情况下,推荐的解决方案是什么?

我能想到的一个选项是扩展 JobLaunchingGateway 并处理文件(移动到错误目录)+ 记录/吞下此异常。从功能的角度来看,除了将文件移出+记录错误之外,这里没有太多要做的。

我写这篇文章是为了寻求更好的解决方案。我认为一个坏文件饿死其他文件的执行是不可取的(即使在轮询周期内)

轮询器线程通常获取一堆文件/消息并将其发送到 JobLaunchingGateway 进行处理。

这是不正确的。请参阅FileReadingMessageSource。它确实在第一个请求时获取:

protected AbstractIntegrationMessageBuilder<File> doReceive() {
// rescan only if needed or explicitly configured
if (this.scanEachPoll || this.toBeReceived.isEmpty()) {
scanInputDirectory();
}
File file = this.toBeReceived.poll();

我们只从该队列中生成一个到上述if (pollForMessage() == null) {

确实,如果出现错误,当前的轮询周期将被取消,但我们不会忽略其他缓存文件。我们只是回到下一个轮询周期的下一个this.toBeReceived.poll()

它只是首先以这种方式设计的:maxMessagesPerPoll是单个轮询周期工作单元的一部分。以及我们从单个执行者的任务中轮询的所有消息。因此,任务中间某处的失败就像任务取消一样。

如果您仍然觉得这不合适,请查看maxMessagesPerPoll = 1是否适合您,因此您的单个轮询周期将针对单个文件。

另一种解决方案是捕获该.handle(jobLaunchingGateway)上的异常,并且不要让它传回轮询器。查看ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain

相关内容

  • 没有找到相关文章

最新更新