如何覆盖Spring Batch CompositeItemWriter在出现异常时为委托编写器管理事务



我正在扩展这个 Spring Batch CompositeItemWriter 如何管理委托编写器的事务? 这里的问题:

就我而言,我有一个以下CompositeItemWriter将数据写入同一数据库的多个表中,在写入数据之前,它通过实施各种业务规则来转换数据。在这里,一条记录可能满足不同的业务规则等。因此,一个作家可能比其他作家获得更多的数据。

@Bean
public CompositeItemWriter<Employee> EmployeeCompositeWriter() throws Exception {
List<ItemWriter<? super Employee>> employee = new ArrayList<>();
employee.add(employeeWriter());
employee.add(departmentWriter());
employee.add(stockWriter());
employee.add(purchaseWriter());
CompositeItemWriter<Employee> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(employee);
compositeItemWriter.afterPropertiesSet();
return compositeItemWriter;
}

场景- 假设第一个写入器工作得很好,第二个写入器生成异常,然后第三个和第四个写入器不会被调用,这是由于事务回滚而在 Spring 批处理中默认的Automic性质。

在这里,即使第 2 个写入器出现任何异常,我也想成功调用第 3 个和第 4 个写入器并保存数据,我还想成功保存第 1 个写入器和第 2 个写入器的数据.. 只有异常数据我想在SkipListener的帮助下存储到错误表中,以识别哪些记录是垃圾或垃圾。

解决方案 - 为了实现上述情况,我们在每个写入器写入方法上添加了@Transactional(propagation = Propagation.REQUIRES_NEW),第一个写入器现在保存了数据,第二个写入器生成异常(使用namedJdbcTemplate.batchUpdate()批量更新数据)我们正在缓存它并重新抛出它,但我们可以看到提交级别降低到 1(偏离过程以识别 extact 垃圾记录),并且第二个写入器再次出现异常,第一个写入器被调用并且它正在保存重复数据并且第 2 位、第 3 位和第 4 位作家被调用,但该垃圾记录也没有流向第 3 位和第 4 位作家。

在这里,我不希望整个批处理作业在单个或几个记录是垃圾时停止,因为此作业对于我们每次运行都至关重要。如果我们可以保存不会出现异常的所有数据,并且仅在SkipListener的帮助下(如果可能)或任何其他方式将异常数据保存到错误表中,有什么办法吗?

如果我们可以将任何步骤的批处理组件(读取器或处理器)部分重用到另一个步骤中,有什么方法吗?

我看不出有什么方法可以将 spring-batch 的单个事务用于将整个块写入为原子与你的想法保持一致,只要你想要skiplistener,就保持单个作家的原子性。

我不确定这是否可能,但也许您将能够快速测试它。这就是消息在某些集成框架(如从一个处理器到错误处理流的 Camel)中携带异常的方式。

  • 项目读取器应返回一个EmployeeWrapper其中包含employee记录,并具有用于存储异常的字段。

  • 您的 CompositeItemWriter 接收List<EmployeeWrapper>,复合编写器有 5 个写入器而不是 4 个。第 5 位作家将做您的SkipListener会做的事情。

List<ItemWriter<? super EmployeeWrapper>> employee = new ArrayList<>();
employee.add(employeeWriter());
employee.add(departmentWriter());
employee.add(stockWriter());
employee.add(purchaseWriter());
employee.add(errorRecordWriter());
  • 您的前 4 个单独的编写器从不抛出异常,而是将其标记为已处理,而是将捕获的异常添加为 EmployeeWrapper 的属性。

  • 您的第 5errorRecordWriter接收所有记录,检查任何添加了异常属性的记录并将它们写入错误表。如果它未能写入错误记录,您可以抛出异常,所有 5 个编写器都将重试。

  • 关于在批量更新失败时如何知道哪个记录是错误记录。似乎当块中发生错误时,Spring 会回滚块并开始在该块中逐条记录重试,以便它知道哪个记录有问题。因此,您可以在个人作家中做同样的事情。即捕获批量更新异常,然后逐个重试以分离错误记录

这里有几件事:

  1. 不要将@Transactional与 Spring Batch 一起使用 - Spring Batch为您管理事务,因此使用该注释会导致问题。不要使用它。
  2. 自己管理异常 -在所描述的方案中,你想要为同一项调用四个ItemWriter实现,但想要跳过委派ItemWriter级别的异常,你将需要编写自己的CompositeItemWriter实现。Spring Batch 出于方便起见提供了这种组合级别(我们使用相同的项目委托给每个ItemWriter实现),但从框架的角度来看,它只是一个ItemWriter.为了在子ItemWriter级别处理异常,您需要编写自己的包装器并自行管理异常。

更新:
我所指的自定义ItemWriter的示例实现(请注意,下面的代码未经测试):

public class MyCompositeItemWriter<T> implements ItemWriter<T> {
private List<ItemWriter<? super T>> delegates;

@Override
public void write(List<? extends T> items) throws Exception {
for(ItemWriter delegate : delegates) {
try {
delegate.write(items);
}
catch (Exception e) {
// Do logging/error handling here
}
}
}
@Override
public void setDelegates(List<ItemWriter<? super T>> delegates) {
super.setDelegates(delegates);
this.delegates = delegates;
}
}

问题的主要根本原因是,我们尝试使用两种不同的ItemWriter将数据写入同一个表中,这导致事务行为异常。

我们已经实现了SkipListenets(考虑到使用可能不会经常获取垃圾或垃圾数据,因为我们在初始数据加载时执行验证。

由于我们已经在批处理作业中实现了"春季批处理跳过技术">,这有助于我们指定某些异常类型和跳过项目的最大编号,并且每当抛出其中一个可跳过的异常时,批处理作业不会失败,而是跳过该特定项目并转到下一个项目。只有当最大没有。到达跳过的项目数,批处理作业将失败。 我们使用跳过逻辑和 Spring Batch 的"容错">功能应用于面向块的步骤中的项目,而不是整个步骤。

因此,如果 Item 未能在一个委托处写入,那么对于所有其他委托,它将被视为失败(该项目不会传递给另一个委托),我们对此很好,因为我们正在捕获错误日志表中的详细信息,我们可以在需要时重新处理它。

最新更新