GCP 数据流 Apache Beam 写入输出错误处理



我需要对我的数据流应用错误处理,以便使用相同的主键多次插入 Spanner。 逻辑是,在当前消息之后可能会收到较旧的消息,我不想覆盖保存的值。 因此,我将创建我的突变作为插入,并在尝试重复插入时抛出错误。

我已经看到了DoFn中尝试块的几个示例,这些块写入端输出以记录任何错误。 这是一个非常好的解决方案,但我需要将错误处理应用于写入不包含 DoFn 的 Spanner 的步骤

spannerBranchTuples2.get(spannerOutput2)
.apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))                      
.apply("Write Spanner Records", SpannerIO.write()
.withInstanceId(options.getSpannerInstanceId())                  
.withDatabaseId(options.getSpannerDatabaseId())
.grouped());

我没有找到任何允许将错误处理应用于此步骤的文档,也没有找到将其重写为 DoFn 的方法。 任何建议如何对此应用错误处理?谢谢

数据流文档中有一个有趣的模式。

基本上,这个想法是在将结果发送到写作转换之前有一个DoFn。它看起来像这样:

final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element());
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

outputTuple.get(deadLetterTag)
.apply(/* Write to a file or table or anything */);
outputTuple.get(successTag)
.apply(/* Write to Spanner or any other sink */);

让我知道这是否有用!

最新更新