在 Spring 批处理中使用拆分限制线程数



我有以下 spring batch xml 文件,其中我定义了一个作业,我想在其中并行运行 4 个不同的任务:

<split id="fillNDDependencies" task-executor="asyncTaskExecutor"
next="decisionExecuteNDDependencies">
<flow>
<step id="fillTABLE1">
<tasklet ref="runTABLE1Tasklet" />
</step>
</flow>
<flow>
<step id="fillTABLE2">
<tasklet ref="runTABLE2Tasklet" />
</step>
</flow>
<flow>
<step id="fillTABLE3">
<tasklet ref="runTABLE3Tasklet" />
</step>
</flow>
<flow>
<step id="fillTABLE4">
<tasklet ref="runTABLE4Tasklet" />
</step>
</flow>
</split>

我使用 ThreadPoolTaskExecutor 来控制使用的线程数。

<bean id="asyncTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="1"></property>
<property name="maxPoolSize" value="1"></property>
<property name="queueCapacity" value="0"></property>
<property name="keepAliveSeconds" value="10"></property>
</bean>

问题是:理想情况下,我想并行运行四个任务,但有时我的机器会更"忙",我只想同时执行其中两个任务,具体取决于我的机器中可用的线程数,但不更改当前的拆分条件。

我试图在我的任务执行器中限制核心池大小和最大池大小,但是当我执行作业时出现以下错误:

org.springframework.batch.core.JobExecutionException: Flow execution ended unexpectedly
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:140)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:304)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.batch.core.job.flow.FlowExecutionException: TaskExecutor rejected task for flow=fillNDDependencies1.1
at org.springframework.batch.core.job.flow.support.state.SplitState.handle(SplitState.java:103)
at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean$DelegateState.handle(SimpleFlowFactoryBean.java:207)
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:165)
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144)
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:134)
... 5 more

因此,除非 PoolSize 与拆分内的步骤数匹配,否则作业将不起作用。

如何限制一次执行的任务,而不必在每次拆分中创建一个步骤较少的新作业?

谢谢。


请注意,这不是此问题的副本 如何在 Spring 批处理中设置多线程? ,因为我已经在使用 split 作为并行化任务执行的一种手段。

ThreadPoolTaskExecutor有一个名为queueCapacity的参数,用于设置执行程序在开始拒绝任务之前可以接受的任务数。您需要根据需要设置此参数。

现在对于工作线程的数量,我在您的示例中看到您将corePoolSizemaxPoolSize设置为 1,因此池中最多有一个线程。例如,如果您希望两个线程并行工作,则可以将corePoolSize设置为 1,将maxPoolSize设置为 2。

这些设置可以在运行时通过 JMX 动态更改。在这里查看Javadoc:https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.html#setMaxPoolSize-int-

希望这有帮助。

最新更新