如何从大查询模式错误中获得更好的日志



我遇到了同样的问题:读取数据时出错,错误消息:JSON表遇到太多错误,放弃。行,我很确定它与模式有关:

RuntimeError: BigQuery job beam_bq_job_LOAD_AUTOMATIC_JOB_NAME_LOAD_STEP_... failed. Error Result: <ErrorProto location: 'gs://dataflow/tmp/bq_load/some_file'
message: 'Error while reading data, error message: JSON table encountered too many errors, giving up. Rows: 1; errors: 1. Please look into the errors[] collection for more details. File: gs://some_file'
reason: 'invalid'> [while running 'WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs-ptransform-27']

这里的问题是,我有一个大的模式(运行数据流作业),只是检查它的小错误是乏味的。是否有任何方法可以看到更好的错误消息/获得更多的日志,实际上确定模式的哪个部分是错误的?

我经常与Beam,PythonBigQueryIO有同样的问题,在这种情况下错误不清楚,模式中的坏字段没有显示。

为了解决这类问题,我通常在输入元素中使用模式或对象验证,并在错误元素中使用死信队列。

然后我将错误沉入BigQuery表中进行分析。

我创建了一个库来简化Beam的错误处理,叫做Asgarde:

# Beam pipeline with Asgarde library.
input_teams: PCollection[str] = p | 'Read' >> beam.Create(team_names)
result = (CollectionComposer.of(input_teams)
.map('Map with country', lambda tname: TeamInfo(name=tname, country=team_countries[tname], city=''))
.map('Map with city', lambda tinfo: TeamInfo(name=tinfo.name, country=tinfo.country, city=team_cities[tinfo.name]))
.filter('Filter french team', lambda tinfo: tinfo.country == 'France'))
result_outputs: PCollection[TeamInfo] = result.outputs
result_failures: PCollection[Failure] = result.failures

包装器CollectionComposer是从PCollection创建的,这个结构返回:

  • PCollection输出良好
  • PCollectionof failures

故障由Failure对象表示:

@dataclass
class Failure:
pipeline_step: str
input_element: str
exception: Exception

您可以将FailurePCollection下沉到BigQuery表中进行分析。

你也可以查看这篇文章死信队列错误与Beam, Asgarde, Dataflow和警报在实时

我也和你分享:

  • Beam本地错误处理示例
  • Beam和Asgarde的错误处理示例

最新更新