我正在寻找一种方法,当发生(特定)异常时,使Google DataFlow作业停止从Pub/Sub摄取。
来自 Pub/Sub 的事件是使用TableRowJsonCoder
通过PubsubIO.Read.Bound<TableRow>
JSON 读取的,并使用BigQueryIO.Write.Bound
. (中间有一个ParDo
,它更改了一个字段的内容,并且每天都会发生一些自定义分区,但这应该与此目的无关。
当从 PubSub 摄取的事件/行中存在不是目标 BigQuery 表中的列的字段时,DataFlow 作业会在运行时记录 IOExceptions,声称它无法插入行,但似乎确认这些消息并继续运行。
相反,我想做的是停止从 Pub/Sub 摄取消息和/或使数据流作业崩溃,以便警报可以基于最早的未确认消息的年龄。至少我想确保那些未能插入到 BigQuery 的发布/订阅消息不会被确认,以便我可以解决问题,重新启动数据流作业并再次使用这些消息。
我知道这里描述了一种处理错误输入的建议解决方案:https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow
我也知道 Apache Beam 上的这个 PR 允许插入没有违规字段的行: https://github.com/apache/beam/pull/1778
然而,就我而言,我真的不想防止错误的输入,而是防止程序员的错误,即新字段被添加到推送到 Pub/Sub 的 JSON 消息中,但相应的 DataFlow 作业没有更新。所以我并没有真正的错误数据,我只是想在程序员犯错误时崩溃,在更改消息格式之前没有部署新的数据流作业。
我认为可以(类似于博客文章解决方案)创建一个自定义ParDo
来验证每一行并抛出未捕获并导致崩溃的异常。
但理想情况下,我只想有一些配置,它不处理插入错误并记录它,而只是使作业崩溃或至少停止摄取。
你可以有一个带有 DoFn 的 ParDo,它位于 BQ 写入之前。DoFn 将负责每 X 分钟获取一次输出表架构,并验证要写入的每条记录是否与预期的输出架构匹配(如果不匹配,则引发异常)。
Old Pipeline:
PubSub -> Some Transforms -> BQ Sink
New Pipeline:
PubSub -> Some Transforms -> ParDo(BQ Sink Validator) -> BQ Sink
这样做的好处是,一旦有人修复了输出表架构,管道就会恢复。您需要抛出一条很好的错误消息,说明传入的 PubSub 消息出了什么问题。
或者,您可以让BQ Sink Validator
将消息输出到 PubSub DLQ(监控其大小)。在操作上,您必须更新表,然后将 DLQ 重新引入作为输入。这样做的好处是,只有错误消息会阻止管道执行。