在数据流管道上捕获BigQuery HttpBadRequestError



最近,我的数据流作业抛出了来自BigQuery API的HttpBadRequestError,因为请求大小超过了。

Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
return self._flush_all_batches()
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
*[
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
self._flush_batch(destination)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
result, errors = self._insert_all_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
response = self.client.tabledata.InsertAll(request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
return self._RunMethod(
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 400,
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"errors": [
{
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"domain": "global",
"reason": "badRequest"
}
],
"status": "INVALID_ARGUMENT"
}
}
>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 315, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 483, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 519, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 990, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 730, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 732, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 733, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/common.py", line 1267, in apache_beam.runners.common.DoFnRunner.finish
File "apache_beam/runners/common.py", line 1248, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 1246, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 514, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 520, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1288, in finish_bundle
return self._flush_all_batches()
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1296, in _flush_all_batches
*[
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1297, in <listcomp>
self._flush_batch(destination)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py", line 1326, in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 984, in insert_rows
result, errors = self._insert_all_rows(
File "/usr/local/lib/python3.8/site-packages/apache_beam/utils/retry.py", line 236, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 539, in _insert_all_rows
response = self.client.tabledata.InsertAll(request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 761, in InsertAll
return self._RunMethod(
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.8/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
RuntimeError: apitools.base.py.exceptions.HttpBadRequestError: HttpError accessing <https://bigquery.googleapis.com/bigquery/v2/projects/my-projects/datasets/my-datasets/tables/my-tables/insertAll?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Sat, 20 Feb 2021 14:21:53 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '321', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 400,
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"errors": [
{
"message": "Request payload size exceeds the limit: 10485760 bytes.",
"domain": "global",
"reason": "badRequest"
}
],
"status": "INVALID_ARGUMENT"
}
}
> [while running 'WriteBqTables/WriteBQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-25875']
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service.cc:631

我想使用这种死字模式来减少这种错误,以防再次发生。

当HttpBadRequestError发生时,BQ死字模式是否也有效?或者仅当插入行由于模式不匹配而失败时才有效?我使用Apache Beam SDK 2.27.0版本的python

Thanks in advance

更新20101-02-24:我添加了更多的堆栈跟踪片段,显示错误发生的时间

是的,这个模式可以工作。一般来说,它可以捕获任何可以捕获的失败(有时失败非常严重,以至于处理完全停止)。

在您的特定情况下,堆栈跟踪包括BigQueryIO的这个区域,您可以看到失败的行输出到下面的死信PCollection,在这里。

最新更新