Spring批处理,两个作业FlatFileItemWriter和ClassifierCompositeItemWrit



问题

我必须创建一个包含两个作业的Spring批处理项目,这两个作业可以单独执行,也可以一起执行。每个作业都有必要的代码,可以从数据库中读取,也可以使用FlatFileItemWriter和ClassifierCompositeItemWriter进行编写。我发现,如果我独立执行作业(-Dspring.batch.job.names=schoolJob,-Dspring.patch.job.name=studentJob),文件生成得很好,但当我一起执行作业时(-Dspring.batch.job.names=school job,studentJob),作业的文件只有页脚和页眉。似乎出了什么问题,但我找不到原因。

某些代码

  • 批量配置、作业和步骤
@Configuration
@EnableBatchProcessing
@SuppressWarnings("rawtypes, unchecked")
public class MyJobConfiguration
{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ConfigurableApplicationContext applicationContext;
@Bean
public Step studentStep1() {
return stepBuilderFactory.get("calculateDistinctValuesAndRegisterStudentWriters")
.tasklet(new DynamicStudentWritersConfigurationTasklet(jdbcTemplate,
                  applicationContext))
.build();
}
@Bean
public Step schoolStep1() {
return stepBuilderFactory.get("calculateDistinctValuesAndRegisterSchoolWriters")
.tasklet(new DynamicSchoolWritersConfigurationTasklet(jdbcTemplate,
                 applicationContext))
.build();
}
@Bean
@JobScope
public Step studentStep2(StudentReader reader,
@Qualifier("studentClassfierItemWriter")
ClassifierCompositeItemWriter<Student> writer) {
SimpleStepBuilder<Student, Student> studentStep2 = stepBuilderFactory.get(
"readWriteStudents").<Student, Student>chunk(2).reader(reader).writer(writer);
Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(
FlatFileItemWriter.class);
for (FlatFileItemWriter flatFileItemWriter : beansOfType.values())
{
studentStep2.stream(flatFileItemWriter);
}
return studentStep2.build();
}
@Bean
@JobScope
public Step schoolStep2(SchoolReader reader,
@Qualifier("schoolClassfierItemWriter")
ClassifierCompositeItemWriter<School> writer) {
SimpleStepBuilder<School, School> schoolStep2 = stepBuilderFactory.get("readWriteSchools")
    .<School, School>chunk(2)
    .reader(reader)
    .writer(writer);
Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(
FlatFileItemWriter.class);
for (FlatFileItemWriter flatFileItemWriter : beansOfType.values())
{
schoolStep2.stream(flatFileItemWriter);
}
return schoolStep2.build();
}
@Bean
public Job studentJob(Step studentStep1, Step studentStep2) {
return jobBuilderFactory.get("studentJob").start(studentStep1).next(studentStep2).build();
}
@Bean
public Job schoolJob(Step schoolStep1, Step schoolStep2) {
return jobBuilderFactory.get("schoolJob").start(schoolStep1).next(schoolStep2).build();
}
  • 数据源配置
@Configuration
class DatasourceConfig
{
@Bean
public DataSource dataSource()
{
String dbSchema = "/org/springframework/batch/core/schema-h2.sql";
String initData = "data.sql";
return new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.H2)
.addScript(dbSchema)
.addScript(initData)
.build();
}
}
  • 读者
@Component
class SchoolReader extends JdbcCursorItemReader<School>
{
@Autowired
private DataSource dataSource;
@Override
public void afterPropertiesSet() throws Exception
{
super.setName("schoolItemReader");
super.setDataSource(dataSource);
super.setSql("select * from school");
super.setRowMapper(new BeanPropertyRowMapper<>(School.class));
super.afterPropertiesSet();
}
}
@Component
class StudentReader extends JdbcCursorItemReader<Student>
{
@Autowired
private DataSource dataSource;
@Override
public void afterPropertiesSet() throws Exception
{
super.setName("studentItemReader");
super.setDataSource(dataSource);
super.setSql("select * from student");
super.setRowMapper(new BeanPropertyRowMapper<>(Student.class));
super.afterPropertiesSet();
}
}
  • 作家
@Configuration
public class SchoolWriter
{
@Autowired
private ConfigurableApplicationContext applicationContext;
@Bean(name = "schoolClassfierItemWriter")
@StepScope
public ClassifierCompositeItemWriter<School> itemWriter()
{
Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(
FlatFileItemWriter.class);
Classifier<School, FlatFileItemWriter<School>> classifier = school -> beansOfType.get(
"school-group" + school.getGroupId() + "Writer");
return new ClassifierCompositeItemWriterBuilder().classifier(classifier).build();
}
}
@Configuration
public class StudentWriter
{
@Autowired
private ConfigurableApplicationContext applicationContext;
@Bean(name = "studentClassfierItemWriter")
@StepScope
public ClassifierCompositeItemWriter<Student> itemWriter()
{
Map<String, FlatFileItemWriter> beansOfType = applicationContext.getBeansOfType(
FlatFileItemWriter.class);
Classifier<Student, FlatFileItemWriter<Student>> classifier = student -> beansOfType.get(
"student-group" + student.getGroupId() + "Writer");
return new ClassifierCompositeItemWriterBuilder().classifier(classifier).build();
}
}
  • 任务集
class DynamicSchoolWritersConfigurationTasklet implements Tasklet
{
private JdbcTemplate                   jdbcTemplate;
private ConfigurableApplicationContext applicationContext;
public DynamicSchoolWritersConfigurationTasklet(JdbcTemplate jdbcTemplate,
ConfigurableApplicationContext applicationContext)
{
this.jdbcTemplate       = jdbcTemplate;
this.applicationContext = applicationContext;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
{
ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
String        sql    = "select distinct(groupId) from school";
List<Integer> groups = jdbcTemplate.queryForList(sql, Integer.class);
for (Integer group : groups)
{
String name = "school-group" + group + "Writer";
//@f:off
MutablePropertyValues propertyValues = new MutablePropertyValues();
propertyValues.addPropertyValue("name", name);
propertyValues.addPropertyValue("lineAggregator", new PassThroughLineAggregator<>());
propertyValues.addPropertyValue("resource", new FileSystemResource("school-" + group + ".txt"));
propertyValues.addPropertyValue("headerCallback", (FlatFileHeaderCallback) writer -> writer.write("header-school"));
propertyValues.addPropertyValue("footerCallback", (FlatFileFooterCallback) writer -> writer.write("footer-school"));
//@f:on
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClassName(FlatFileItemWriter.class.getName());
beanDefinition.setPropertyValues(propertyValues);
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
registry.registerBeanDefinition(name, beanDefinition);
}
return RepeatStatus.FINISHED;
}
}
class DynamicStudentWritersConfigurationTasklet implements Tasklet
{
private JdbcTemplate                   jdbcTemplate;
private ConfigurableApplicationContext applicationContext;
public DynamicStudentWritersConfigurationTasklet(JdbcTemplate jdbcTemplate,
ConfigurableApplicationContext applicationContext)
{
this.jdbcTemplate       = jdbcTemplate;
this.applicationContext = applicationContext;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
{
ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
String        sql    = "select distinct(groupId) from student";
List<Integer> groups = jdbcTemplate.queryForList(sql, Integer.class);
for (Integer group : groups)
{
String name = "student-group" + group + "Writer";
//@f:off
MutablePropertyValues propertyValues = new MutablePropertyValues();
propertyValues.addPropertyValue("name", name);
propertyValues.addPropertyValue("lineAggregator", new PassThroughLineAggregator<>());
propertyValues.addPropertyValue("resource", new FileSystemResource("student-" + group + ".txt"));
propertyValues.addPropertyValue("headerCallback", (FlatFileHeaderCallback) writer -> writer.write("header-student"));
propertyValues.addPropertyValue("footerCallback", (FlatFileFooterCallback) writer -> writer.write("footer-student"));
//@f:on
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClassName(FlatFileItemWriter.class.getName());
beanDefinition.setPropertyValues(propertyValues);
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) beanFactory;
registry.registerBeanDefinition(name, beanDefinition);
}
return RepeatStatus.FINISHED;
}
}
  • DAO
@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class School
{
private int    id;
private String name;
private int    groupId;
}
@Getter
@Setter
@ToString
@NoArgsConstructor
@AllArgsConstructor
public class Student
{
private int    id;
private String name;
private int    groupId;
}

这与https://stackoverflow.com/a/67635289/5019386.我认为你也需要让你的动态项目编写器步进范围,比如:

propertyValues.addPropertyValue("scope", "step");

请注意,我没有尝试过。也就是说,我真的建议你的应用程序做一件事并做好它,即隔离作业定义并分别打包/运行每个作业。

最新更新