JSR 352 :如何从分区步骤的每个分区的写入器收集数据?

  • 本文关键字:分区 数据 JSR jsr352 java-batch
  • 更新时间 :
  • 英文 :


所以,我在一个写入数据库的步骤中有 2 个分区。我想记录每个分区写入的行数,得到总和,然后打印到日志中;

我正在考虑在编写器中使用static变量,并使用步骤上下文/作业上下文将其afterStep()步骤侦听器。但是当我尝试时,我得到了null.我能够在读者close()获得这些值。

这是正确的方法吗?还是应该使用分区收集器/缩减器/分析器?

我在 Websphere Liberty 中使用 java 批处理。我正在Eclipse中开发。

我正在考虑在编写器中使用静态变量,并使用步骤上下文/作业上下文将其放入步骤侦听器的 afterStep() 中。但是当我尝试它时,我得到了空。

此时,ItemWriter可能已经被销毁了,但我不确定。

这是正确的方法吗?

是的,它应该足够好。但是,您需要确保共享所有分区的总行计数,因为批处理运行时为每个分区维护一个StepContext克隆。你应该使用JobContext.

我认为使用PartitionCollectorPartitionAnalyzer也是一个不错的选择。接口分区收集器有一个方法collectPartitionData()来收集来自其分区的数据。收集后,批处理运行时将此数据传递给分区分析器以分析数据。请注意,有

  • 每步 N 个分区收集器(每个分区 1 个)
  • 每步 N 步上下文(每个分区 1 个)
  • 每步 1 个分区分析器

写入的记录可以通过StepContexttransientUserData传递。由于StepContext是为自己的步骤分区保留的,因此暂时性用户数据不会被其他分区覆盖。


这是实现:

我的条目作家

@Inject
private StepContext stepContext;
@Override
public void writeItems(List<Object> items) throws Exception {
// ...
Object userData = stepContext.getTransientUserData();
stepContext.setTransientUserData(partRowCount);
}

MyPartitionCollector

@Inject
private StepContext stepContext;
@Override
public Serializable collectPartitionData() throws Exception {
// get transient user data
Object userData = stepContext.getTransientUserData();
int partRowCount = userData != null ? (int) userData : 0;
return partRowCount;
}

MyPartitionAnalyzer

private int rowCount = 0;
@Override
public void analyzeCollectorData(Serializable fromCollector) throws Exception {
rowCount += (int) fromCollector;
System.out.printf("%d rows processed (all partitions).%n", rowCount);
}

参考 : JSR352 v1.0 最终版本.pdf

让我对接受的答案提供一些替代方案并添加一些评论。

分区分析器变体 - 使用 analyzeStatus() 方法

另一种技术是使用analyzeStatus它仅在每个整个分区结束时调用,并传递分区级退出状态。

public void analyzeStatus(BatchStatus batchStatus, String exitStatus) 

相比之下,上述使用analyzeCollectorData的答案在每个分区上的每个块的末尾被调用。

例如

public class MyItemWriteListener extends AbstractItemWriteListener {
@Inject
StepContext stepCtx;
@Override
public void afterWrite(List<Object> items) throws Exception {
// update 'newCount' based on items.size()
stepCtx.setExitStatus(Integer.toString(newCount));
}

显然,这仅在您不将退出状态用于其他目的时才有效。 您可以设置任何工件的退出状态(尽管这种自由可能是必须跟踪的另一件事)。

评论

该 API 旨在促进跨 JVM 调度单个分区的实现(例如,在 Liberty 中,您可以在此处看到这一点。 但是使用静态会将您绑定到单个 JVM,因此不建议使用此方法。

另请注意,JobContextStepContext都是以我们在批处理中看到的类似"线程本地"的方式实现的。

相关内容

  • 没有找到相关文章

最新更新