为什么Spring批处理作为单线程而不是多线程执行



我正在通过石英调度器调用一个spring批处理作业,该作业应该每1分钟运行一次。当作业第一次运行时,ItemReader将成功打开并运行作业。然而,当作业尝试第二次运行时,它使用的是与第一次相同的实例,该实例已经初始化,并接收到"java.lang.IollegalStateException:Stream已经初始化。在重新打开之前关闭。"我已将范围设置为项读取器和项写入器的步骤。

如果我在配置上做错了什么,请告诉我?

<?xml version="1.0" encoding="UTF-8"?>
<import resource="context.xml"/>
<import resource="database.xml"/>   
<bean id="MyPartitioner" class="com.MyPartitioner" />
<bean id="itemProcessor" class="com.MyProcessor" scope="step" />
<bean id="itemReader" class="com.MyItemReader" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql" value="query...."/>
    <property name="rowMapper">
        <bean class="com.MyRowMapper" scope="step"/>
    </property>
</bean>
<job id="MyJOB" xmlns="http://www.springframework.org/schema/batch">
    <step id="masterStep">
        <partition step="slave" partitioner="MyPartitioner">
            <handler grid-size="10" task-executor="taskExecutor"/>
        </partition>
    </step>
</job>
<step id="slave" xmlns="http://www.springframework.org/schema/batch">
    <tasklet>
        <chunk reader="itemReader" writer="mysqlItemWriter" processor="itemProcessor" commit-interval="100"/>
    </tasklet>
</step>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="20"/>
    <property name="maxPoolSize" value="20"/>
    <property name="allowCoreThreadTimeOut" value="true"/>
</bean>
<bean id="mysqlItemWriter" class="com.MyItemWriter" scope="step">
    <property name="dataSource" ref="dataSource"/>
    <property name="sql">
        <value>
            <![CDATA[      
            query.....
        ]]>
        </value>
    </property>
<property name="itemPreparedStatementSetter">
        <bean class="com.MyPreparedStatementSetter" scope="step"/>
    </property>
</bean>

Quartz作业调用程序-

Scheduler scheduler = new    StdSchedulerFactory("quartz.properties").getScheduler();
JobKey jobKey = new JobKey("QUARTZJOB", "QUARTZJOB");
JobDetail jobDetail =     JobBuilder.newJob("com.MySpringJobInvoker").withIdentity(jobKey).build();
jobDetail.getJobDataMap().put("jobName", "SpringBatchJob");
SimpleTrigger smplTrg = newTrigger().withIdentity("QUARTZJOB", "QUARTZJOB").startAt(new Date(startTime))  
                            .withSchedule(simpleSchedule().withIntervalInSeconds(frequency).withRepeatCount(repeatCnt))
                            .forJob(jobDetail).withPriority(5).build();
scheduler.scheduleJob(jobDetail, smplTrg);

Quartz作业-

public class MySpringJobInvoker implements Job
{
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException
{
    JobDataMap data = jobExecutionContext.getJobDetail().getJobDataMap();
    ApplicationContext applicationContext =ApplicationContextUtil.getInstance();
    JobLauncher jobLauncher = (JobLauncher) applicationContext.getBean("jobLauncher");
    org.springframework.batch.core.Job job = (org.springframework.batch.core.Job) applicationContext.getBean(data.getString("jobName"));
    JobParameters param = new JobParametersBuilder().addString("myparam","myparam").addString(Long.toString(System.currentTimeMillis(),Long.toString(System.currentTimeMillis())).toJobParameters();
    JobExecution execution = jobLauncher.run(job, param);
}

}

Singleton类-

公共类ApplicationContextUtil{
private static ApplicationContext ApplicationContext;

public static synchronized ApplicationContext getInstance()
{
    if(applicationContext == null)
    {
        applicationContext = new ClassPathXmlApplicationContext("myjob.xml");
    }
    return applicationContext;
}

}

从Quartz传递给Spring Batch作业的参数是什么?你能发布异常堆栈跟踪吗?

如果您试图用相同的参数执行批处理的第二个实例,它将不起作用。Spring Batch根据传递的参数识别作业的唯一实例,因此作业的每个新实例都需要传递不同的参数。

最新更新