在从数据流插入 BigQuery 之前验证行



根据从数据流加载 Bigquery 表时如何设置maximum_bad_records?目前无法在将数据从数据流加载到 BigQuery 时设置maxBadRecords配置。建议先验证数据流作业中的行,然后再将它们插入 BigQuery。

如果我有TableSchemaTableRow,我该如何确保该行可以安全地插入表中?

一定有一种更简单的方法可以做到这一点,而不是遍历架构中的字段,查看它们的类型并查看行中值的类,对吗?这似乎容易出错,并且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败。

更新:

我的用例是一个 ETL 作业,它首先将在 Cloud Storage 上的 JSON(每行一个对象(日志上运行并批量写入 BigQuery,但后来将从 PubSub 读取对象并连续写入 BigQuery。这些对象包含许多在 BigQuery 中不需要的信息,还包含甚至无法在架构中描述的部分(基本上是自由格式的 JSON 有效负载(。时间戳等内容也需要格式化才能与 BigQuery 配合使用。此作业将有一些变体在不同的输入上运行并写入不同的表。

从理论上讲,这不是一个非常困难的过程,它需要一个对象,提取一些属性(50-100(,格式化其中一些并将对象输出到BigQuery。我或多或少只是遍历属性名称列表,从源对象中提取值,查看配置以查看是否应该以某种方式格式化属性,必要时应用格式(这可能是缩小大小写,将毫秒时间戳除以 1000,从 URL 中提取主机名等(,并将值写入TableRow对象。

我的问题是数据很混乱。有几亿个物体,有一些看起来不像预期的那样,这是罕见的,但有了这些数量,罕见的事情仍然会发生。有时,应包含字符串的属性包含整数,反之亦然。有时有一个数组或一个对象,其中应该有一个字符串。

理想情况下,我想拿起我的TableRow,把它递给TableSchema,问"这行得通吗?

由于这是不可能的,所以我所做的是查看TableSchema对象并尝试自己验证/转换值。如果TableSchema说属性的类型是STRING我会在将其添加到TableRow之前运行value.toString()。如果是INTEGER我会检查它是IntegerLongBigInteger,依此类推。这种方法的问题在于,我只是猜测什么在 BigQuery 中会起作用。它将接受哪些Java数据类型FLOAT?为了TIMESTAMP?我认为我的验证/转换捕获了大多数问题,但总有例外和边缘情况。

根据我的经验,这是非常有限的,如果单行未通过 BigQuery 的验证,整个工作流(作业?工作流?不确定正确的术语(就会失败(就像常规加载一样,除非maxBadRecords设置为足够大的数字(。它还会失败,并显示表面上有用的消息,例如"BigQuery 导入作业"dataflow_job_xxx"失败。原因:(5db0b2cdab1557e0(:项目"xxx"中的 BigQuery 作业"dataflow_job_xxx"已完成,但出现错误:错误结果:为非记录字段指定 JSON 映射,错误:为非记录字段指定 JSON 映射,错误:为非记录字段指定 JSON 映射,错误:为非记录字段指定 JSON 映射,错误:为非记录字段指定 JSON 映射".也许在某个地方可以看到更详细的错误消息,可以告诉我它是哪个属性以及值是什么?如果没有这些信息,它也可以说"坏数据"。

据我所知,至少在批处理模式下运行时,数据流会将TableRow对象写入云存储中的暂存区域,然后在一切就绪后开始加载。这意味着我无处可捕获任何错误,加载 BigQuery 时我的代码不再运行。我还没有在流媒体模式下运行任何作业,但我不确定那里会有什么不同,从我(诚然有限(的理解来看,基本原理是相同的,只是批量大小更小。

人们使用 Dataflow 和 BigQuery,因此,如果不总是担心整个管道会因为单个错误的输入而停止,就不可能完成这项工作。人们是怎么做到的?

我假设您将文件中的 JSON 反序列化为Map<String, Object>.然后你应该能够用TableSchema递归地进行类型检查。

我建议使用迭代方法来开发架构验证,包括以下两个步骤。

  1. 编写一个将 JSON 行转换为TableRow对象的PTransform<Map<String, Object>, TableRow>TableSchema也应该是函数的构造函数参数。你可以从使这个函数变得非常严格开始 - 要求JSON直接将输入解析为整数,例如,当找到BigQuery INTEGER模式时 - 并积极地错误地声明记录。基本上,通过超严格的处理来确保没有输出无效记录。

    我们这里的代码做了一些类似的事情——给定一个由 BigQuery 生成并作为 JSON 写入 GCS 的文件,我们递归地遍历模式并进行一些类型转换。但是,我们不需要验证,因为 BigQuery 本身写入了数据。

    请注意,TableSchema对象不是Serializable 。我们已经通过将DoFnPTransform构造函数中的TableSchema转换为 JSON String并返回来解决此问题。请参阅BigQueryIO.java中使用 jsonTableSchema 变量的代码。

  2. 使用这篇博文中描述的"死信"策略来处理不良记录 - 从 PTransform 中输出有问题的 Map<String, Object> 行并将它们写入文件。这样,您可以稍后检查验证失败的行。

您可以从一些小文件开始,然后使用DirectPipelineRunner而不是DataflowPipelineRunner。直接运行器在您的计算机上运行管道,而不是在 Google Cloud 数据流服务上运行,并使用 BigQuery 流写入。我相信当这些写入失败时,您将获得更好的错误消息。

(我们对批处理作业使用 GCS->BigQuery 加载作业模式,因为它更高效、更具成本效益,但 BigQuery 流式处理写入流式处理作业,因为它们延迟低。

最后,在日志记录信息方面:

  • 一定要检查云日志记录(通过点击日志面板上的Worker Logs链接。
  • 如果运行bq命令行实用程序,则可以获得有关批处理数据流触发的加载作业失败的原因的更好信息:bq show -j PROJECT:dataflow_job_XXXXXXX

最新更新