弹簧批重复步骤结束在永无止境的循环



我有一个春季批处理工作,我想做以下…

Step 1 - 
   Tasklet - Create a list of dates, store the list of dates in the job execution context.
Step 2 - 
   JDBC Item Reader - Get list of dates from job execution context.
                      Get element(0) in dates list. Use is as input for jdbc query. 
                      Store element(0) date is job execution context 
                      Remove element(0) date from list of dates
                      Store element(0) date in job execution context                 
   Flat File Item Writer - Get element(0) date from job execution context and use for file name.
Then using a job listener repeat step 2 until no remaining dates in the list of dates.

我已经创建了作业,并且在第一次执行步骤2时工作正常。但是第二步没有像我想的那样重复。我知道这一点,因为当我调试我的代码时,它只在第2步的初始运行时中断。

尽管我知道它没有运行,但它仍然继续给我如下消息,好像它正在运行第2步。

2016-08-10 22:20:57.842  INFO 11784 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Duplicate step [readStgDbAndExportMasterListStep] detected in execution of job=[exportMasterListCsv]. If either step fails, both will be executed again on restart.
2016-08-10 22:20:57.846  INFO 11784 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [readStgDbAndExportMasterListStep]

这将在一个永不结束的循环中结束。

有人能帮我弄清楚或给出一个建议,为什么我的风格2只运行一次?

thanks in advance

我为我的代码添加了两个链接到PasteBin,以免污染这篇文章。

http://pastebin.com/QhExNikm (Job Config)

http://pastebin.com/sscKKWRk (Common Job Config)

http://pastebin.com/Nn74zTpS(步骤执行监听器)

我从您的问题和代码中推断,根据您检索的日期数量(这发生在实际作业开始之前),您将执行一个步骤的日期数量。

我建议改变设计。创建一个java类,以列表的形式获取日期,并根据该列表动态创建步骤。像这样:

@EnableBatchProcessing
public class JobConfig {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;  
    @Autowired
    private JobDatesCreator jobDatesCreator;
    @Bean
    public Job executeMyJob() {
        List<Step> steps = new ArrayList<Step>();
        for (String date : jobDatesCreator.getDates()) {
            steps.add(createStep(date));
        }
        return jobBuilderFactory.get("executeMyJob")
                .start(createParallelFlow(steps))
                .end()
                .build();       
    }
    private Step createStep(String date){
        return stepBuilderFactory.get("readStgDbAndExportMasterListStep" + date)
                .chunk(your_chunksize)
                .reader(your_reader)
                .processor(your_processor)
                .writer(your_writer)                                
                .build();       
    }   
    private Flow createParallelFlow(List<Step> steps) {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        // max multithreading = -1, no multithreading = 1, smart size = steps.size()
        taskExecutor.setConcurrencyLimit(1); 
        List<Flow> flows = steps.stream()
                .map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build())
                .collect(Collectors.toList());
        return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
                .split(taskExecutor)
                .add(flows.toArray(new Flow[flows.size()]))
                .build();
    }  
}

EDIT:添加了"jobParameter"输入(方法也略有不同)

在类路径的某个地方添加以下示例.properties文件:

sql.statement="select * from awesome"

并将以下注释添加到您的JobDatesCreator类

@PropertySource("classpath:example.properties")

您也可以提供特定的sql语句作为命令行参数。来自spring文档:

可以使用特定的命令行开关启动(例如Java -jar)app.jar - name = "春天").

更多信息请参见http://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-external-config.html

获取日期的类(为什么要使用微线程呢?):

@PropertySource("classpath:example.properties")
public class JobDatesCreator {
    @Value("${sql.statement}")
    private String sqlStatement;
    @Autowired
    private CommonExportFromStagingDbJobConfig commonJobConfig; 
    private List<String> dates; 
    @PostConstruct
    private void init(){
        // Execute your logic here for getting the data you need.
        JdbcTemplate jdbcTemplate = new JdbcTemplate(commonJobConfig.onlineStagingDb);
        // acces to your sql statement provided in a property file or as a command line argument
        System.out.println("This is the sql statement I provided in my external property: " + sqlStatement);
        // for now..
        dates = new ArrayList<>();
        dates.add("date 1");
        dates.add("date 2");
    }
    public List<String> getDates() {
        return dates;
    }
    public void setDates(List<String> dates) {
        this.dates = dates;
    }
}

我还注意到你有很多重复的代码,你可以很容易地重构。现在,对于每个作者,您都有如下内容:

@Bean
public FlatFileItemWriter<MasterList> division10MasterListFileWriter() {
    FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(new File(outDir, MerchHierarchyConstants.DIVISION_NO_10 )));
    writer.setHeaderCallback(masterListFlatFileHeaderCallback());
    writer.setLineAggregator(masterListFormatterLineAggregator());
    return writer;
}

考虑使用这样的语句:

public FlatFileItemWriter<MasterList> divisionMasterListFileWriter(String divisionNumber) {
    FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(new File(outDir, divisionNumber )));
    writer.setHeaderCallback(masterListFlatFileHeaderCallback());
    writer.setLineAggregator(masterListFormatterLineAggregator());
    return writer;
}

由于不是所有的代码都可以正确地复制您的问题,因此此答案是解决您的问题的建议/指示。

基于我们对Spring批处理在微线程中执行动态生成的步骤的讨论,我试图回答有关如何在作业实际执行之前访问jobParameter的问题。

我假设有restcall来执行批处理。一般来说,这需要采取以下步骤。1. 接收rest调用及其参数的一段代码2. 创建一个新的springcontext(有几种方法可以重用现有的context并再次启动作业,但是在重用步骤、读取器和写入器时存在一些问题)3.启动作业

最简单的解决方案是将从服务接收到的jobparameter存储为系统属性,然后在步骤3中构建作业时访问该属性。但是,如果有多个用户同时启动该作业,这可能会导致问题。

还有其他方法可以在加载springcontext时将参数传递给它。但这取决于你设置情境的方式。例如,如果在第2步中直接使用SpringBoot,则可以编写如下方法:

private int startJob(Properties jobParamsAsProps) {
  SpringApplication springApp = new SpringApplication(.. my config classes ..);
  springApp.setDefaultProperties(jobParamsAsProps);
  ConfigurableApplicationContext context = springApp.run();
  ExitCodeGenerator exitCodeGen = context.getBean(ExitCodeGenerator.class);
  int code = exitCodeGen.getExitCode();
  context.close();
  return cod;
}
这样,您就可以使用标准的Value-或ConfigurationProperties注释正常访问属性。

最新更新