在从Pub/Sub写入BigQuery的Google Cloud数据流管道中,是否可以捕获丢失的数据集java.lang



我正在尝试处理数据流作业试图动态写入BigQuery表目的地的错误。

我想捕捉以下异常:

java.lang.RuntimeException:无法获取数据集的数据集example_项目中的数据集example_项目

以便创建数据集,然后重试写入BigQuery。

有可能以这种方式捕获异常吗?如果有,你知道我需要在代码中添加try/catch逻辑吗?

您不能使用try-catch块来处理这种情况,因为这是一个内部BQ api错误。相反,我建议您编写"重试瞬态"策略并设置错误类型。通过这种方式,您可以将BigQuery写入错误结果存储在PCollection中,然后根据需要转储该记录。请参考下面的片段来实现相同的目的。

WriteResult result = formattedData.get(successRows).setCoder(TableRowJsonCoder.of()).apply("BQ SteamingInserts",
BigQueryIO.writeTableRows().withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.to("audit.db_audit")
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()).withoutValidation()
.withExtendedErrorInfo());

使用上面的代码片段,如果某个操作由于ddl操作而失败,则数据将存储在WriteResult中。

PCollection<String> failedInserts = result.getFailedInsertsWithErr().apply("BQErrorToTableRow",
ParDo.of(new BQErrorToString()));

您可以使用上面的代码片段获取失败的记录。如果有帮助,请告诉我:(

不存在的BigQuery数据集和/或表将被无限期重试,并可能导致管道阻塞。BigQueryIO没有一个可配置的选项来自动创建不存在的BigQuery数据集,它只有一个创建不存在BigQuery表的选项,但指定的数据集资源必须存在或在调用writing to table代码之前创建。

我还在Beam文档中发现,

要写入的数据集必须已经存在

请参阅官方文档,了解如何在云数据流中处理Java异常,并参阅示例。

Dataflow服务在批处理模式下重试失败任务最多4次,在流模式下重试次数不受限制。在批处理模式下,您的作业将失败,而在流媒体模式下,它可能会无限期地暂停。

我希望它能有所帮助。

相关内容

最新更新