如何在Spring Batch作业之间传递数据



我熟悉如何在Spring Batch作业的步骤之间传递数据。但是,当你的工作由许多较小的工作组成时,会发生什么呢?在下面的例子中,我想在第一个作业siNotificationJob结束时的JobExecutionContext中设置一些数据。然后,可以从下一个作业ciNotificationJob中StepExecutionContext的JobExecutionContext中读取该数据。我需要以某种方式推广这些数据吗?在用于配置作业参数的步骤"ciNotificationJob"中定义的作业参数提取器中,我似乎看不到结果。

想法?

Andrew

<job id="notificationJob" xmlns="http://www.springframework.org/schema/batch">
<batch:step id="pn_step_0" next="pn-step-1">
<batch:job ref="siNotificationJob" job-launcher="jobLauncher" 
job-parameters-extractor="jobParamsExtractor"/>
</batch:step>       
<batch:step id="pn-step-1" next="pn-step-2">
<batch:job ref="ciNotificationJob" job-launcher="jobLauncher" 
job-parameters-extractor="jobParamsExtractor"/>
</batch:step>           
</job>

我解决了这个问题。我将通过例子向你们展示我是如何解决它的。它很复杂,但我认为最终结果相当容易理解。

我有一个整体的工作叫做"通知工作"。它有三个步骤,调用3个不同的作业(而不是步骤)。这些作业中的每一个都可以独立运行,也可以从顶级"notificationJob"中调用。此外,每个子作业都有许多步骤。我不打算在这里展示所有这些步骤,只是想强调一下,这些都是完整的工作,有更多的步骤。

<job id="notificationJob" xmlns="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="pn_job-parent-listener" />
</batch:listeners>
<batch:step id="pn_step-0" next="pn-step-1">
<batch:job ref="siNotificationJob" job-launcher="jobLauncher" 
job-parameters-extractor="futureSiParamsExtractor"/>
</batch:step>
<batch:step id="pn-step-1" next="pn-step-2">
<batch:job ref="ciNotificationJob" job-launcher="jobLauncher" 
job-parameters-extractor="futureCiParamsExtractor"/>
</batch:step>
<batch:step id="pn-step-2">
<batch:job ref="combineResultsJob" job-launcher="jobLauncher" 
job-parameters-extractor="jobParamsExtractor"/>
</batch:step>           
</job>

关键是能够从一个作业中提取结果,并在下一个作业中将其读取。现在,你可以通过多种方式做到这一点。一种方法是将一个作业的结果输出到DB或文本文件中,然后从该文件/表中读取下一个作业。由于我没有处理那么多数据,所以我在记忆中传递信息。因此,您将注意到作业参数提取器。您可以依赖参数提取器的内置实现,也可以实现自己的提取器。我实际上两者都用。他们所做的只是从StepExecution中提取价值,然后我们需要将其提升/移动到下一个子作业。

<bean id="jobParamsExtractor" class="org.springframework.batch.core.step.job.DefaultJobParametersExtractor">
<property name="keys">
<list>
<value>OUTPUT</value>
</list>
</property>
</bean>
<bean id="futureSiParamsExtractor" class="jobs.SlideDatesParamExtractor">
<property name="mode" value="FORWARD" />
<property name="addedParams">
<map><entry>
<key><value>outputName</value></key>
<value>FUTURE_SI_JOB_RESULTS</value>
</entry></map>
</property>
</bean> 
<bean id="futureCiParamsExtractor" class="jobs.SlideDatesParamExtractor">
<property name="mode" value="FORWARD" />
<property name="addedParams">
<map><entry>
<key><value>outputName</value></key>
<value>FUTURE_CI_JOB_RESULTS</value>
</entry></map>
</property>
</bean>

最后,您会注意到有一个父工作监听器。这就是将状态从一个作业转移到下一个作业的魔力。这是我实现的类。

<bean id="pn_job-state-listener" class="jobs.JobStateListener">
<property name="parentJobListener" ref="pn_job-parent-listener" />
</bean> 
<bean id="pn_job-parent-listener" class="cjobs.ParentJobListener">
</bean>
package jobs.permnotification;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class ParentJobListener implements JobExecutionListener
{
private JobExecution parentExecution;
@Override
public void beforeJob(JobExecution jobExecution)
{
this.parentExecution = jobExecution;
}
@Override
public void afterJob(JobExecution jobExecution)
{
// TODO Auto-generated method stub
}
public void setParentExecution(JobExecution parentExecution)
{
this.parentExecution = parentExecution;
}
public JobExecution getParentExecution()
{
return parentExecution;
}
}

package jobs.permnotification;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class JobStateListener implements JobExecutionListener
{
private ParentJobListener parentJobListener;

@Override
public void beforeJob(JobExecution jobExecution)
{
if(parentJobListener == null || parentJobListener.getParentExecution() == null) return;
passStateFromParentToJob(StepKey.FUTURE_SI_JOB_RESULTS.toString(), jobExecution);
passStateFromParentToJob(StepKey.FUTURE_CI_JOB_RESULTS.toString(), jobExecution);
passStateFromParentToJob(StepKey.OUTPUT.toString(), jobExecution);
}
@Override
public void afterJob(JobExecution jobExecution)
{
if(parentJobListener == null || parentJobListener.getParentExecution() == null) return;
//take state from child step and move it into the parent execution context
passStateFromJobToParent(StepKey.FUTURE_SI_JOB_RESULTS.toString(), jobExecution);
passStateFromJobToParent(StepKey.FUTURE_CI_JOB_RESULTS.toString(), jobExecution);
passStateFromJobToParent(StepKey.OUTPUT.toString(), jobExecution);
}
private void passStateFromJobToParent(String key, JobExecution jobExecution)
{
Object obj = jobExecution.getExecutionContext().get(key);
if(obj != null)
parentJobListener.getParentExecution().getExecutionContext().put(key, obj);
}
private void passStateFromParentToJob(String key, JobExecution jobExecution)
{
Object obj = parentJobListener.getParentExecution().getExecutionContext().get(key);
if(obj != null)
jobExecution.getExecutionContext().put(key, obj);
}
public void setParentJobListener(ParentJobListener parentJobListener)
{
this.parentJobListener = parentJobListener;
}
public ParentJobListener getParentJobListener()
{
return parentJobListener;
}
}

这有点像黑客。。。。建议您改用spring集成。。但看看这是否适用于你的情况。

如果您已经设置了spring-batch元数据表,那么如果查询最近运行的作业的表,您可能可以获得在每个作业中生成的数据。作业执行上下文中的所有数据都已存储并可以查询。

春季批处理元表

最新更新