项处理器中出现Spring Batch运行时异常



我正在学习spring-batch,并试图了解异常期间项目处理器是如何工作的。

我正在从csv文件中读取由3条记录组成的数据块,并对其进行处理并将其写入数据库。

我的csv文件

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doem
Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe

批量配置,读取3个区块中的项目,并跳过限制2

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean
public FlatFileItemReader<Person> reader() {
return new FlatFileItemReaderBuilder<Person>().name("personItemReader").resource(new ClassPathResource("sample-data.csv")).delimited()
.names(new String[] { "firstName", "lastName" }).fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {
{
setTargetType(Person.class);
}
}).build();
}
@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}
@Bean
public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Person>().itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO person (first_name, last_name) VALUES (:firstName, :lastName)").dataSource(dataSource).build();
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
return jobBuilderFactory.get("importUserJob").incrementer(new RunIdIncrementer()).listener(listener).flow(step1).end().build();
}
@Bean
public Step step1(JdbcBatchItemWriter<Person> writer) {
return stepBuilderFactory.get("step1").<Person, Person> chunk(3).reader(reader()).processor(processor()).writer(writer).faultTolerant().skipLimit(2)
.skip(Exception.class).build();
}
}

我正在尝试模拟一个异常,通过在我的项目处理器中手动抛出一条记录的异常

public class PersonItemProcessor implements ItemProcessor<Person, Person> {
private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
@Override
public Person process(final Person person) throws Exception {
final String firstName = person.getFirstName().toUpperCase();
final String lastName = person.getLastName().toUpperCase();
final Person transformedPerson = new Person(firstName, lastName);
log.info("Converting (" + person + ") into (" + transformedPerson + ")");
if (person.getLastName().equals("Doem"))
throw new Exception("DOOM");
return transformedPerson;
}
}

现在,根据跳过限制,当抛出异常时,项目处理器正在重新处理块,并跳过抛出错误的项目,项目写入还会将所有记录插入DB中,除了一条异常记录。

这一切都很好,因为我的处理器,它只是将小写名称转换为大写名称,并且它可以运行多次而不会产生任何影响。

但假设我的项目处理器正在调用web服务并发送数据。以及在成功调用web服务后是否引发异常。则块中的剩余数据将被再次处理(并再次调用webservice(。我不想再调用web服务,因为这就像向web服务发送重复的数据,web服务系统无法识别重复的数据。

如何处理这种情况。一个选项是不要跳过Exception,这意味着即使处理器调用了web服务,我在区块中的一条记录也不会到达项目编写器。所以这是不正确的。

其他选项块的大小应该是1,那么这在处理数千条记录时可能效率不高。

还有其他选择吗?

根据您的描述,您的项目处理器不是幂等的。然而,文档的容错部分指出,当使用容错步骤时,项目处理器应该是幂等的。以下是摘录:

如果一个步骤被配置为容错(通常通过使用跳过或重试处理(,则所使用的任何ItemProcessor都应该以幂等的方式实现。

最新更新