BigQueryExecuteQueryOperator超时问题(如何增加超时)



在气流中遇到问题。简而言之:我试图从1.10.14版本移到2.2.2版本。但是,当我想用BigQueryOperator替换BigQueryExecuteQueryOperator时我试图增加jobTimeoutMs如何告诉Bigquery API文档,但仍然看到这个问题。

当作业运行时间超过1分钟时,我会发现此问题同时,在旧的Bigquery运营商中,Airflow 1.10.14上的谷歌云连接配置相同。我还没有看到这种问题

mobile_push_stat = BigQueryExecuteQueryOperator(
task_id="mobile_push_stat",
sql="/sql/updater/mobile_push_stat.sql",
use_legacy_sql=False,
api_resource_configs={"jobTimeoutMs": "3600000"},
gcp_conn_id="bigquery_work",
)

LOGs

*** Reading local file: /srv/airflow/logs/ExtensionPushStat/mobile_push_stat/2021-12-05T06:00:00+00:00/29.log
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [queued]>
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [queued]>
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1242} INFO - Starting attempt 29 of 29
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1243} INFO - 
--------------------------------------------------------------------------------
[2021-12-06, 17:33:11 UTC] {taskinstance.py:1262} INFO - Executing <Task(BigQueryExecuteQueryOperator): mobile_push_stat> on 2021-12-05 06:00:00+00:00
[2021-12-06, 17:33:11 UTC] {base_task_runner.py:141} INFO - Running on host: airflow-vm-v2
[2021-12-06, 17:33:11 UTC] {base_task_runner.py:142} INFO - Running: ['airflow', 'tasks', 'run', 'ExtensionPushStat', 'mobile_push_stat', 'scheduled__2021-12-05T06:00:00+00:00', '--job-id', '3235', '--raw', '--subdir', 'DAGS_FOLDER/ExtensionPushStat.py', '--cfg-path', '/tmp/tmpk6p4qql0', '--error-file', '/tmp/tmpwz3z5o0y']
[2021-12-06, 17:33:12 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat [[34m2021-12-06, 17:33:12 UTC[0m] {[34mdagbag.py:[0m500} INFO[0m - Filling up the DagBag from /srv/airflow/dags/ExtensionPushStat.py[0m
[2021-12-06, 17:34:11 UTC] {local_task_job.py:206} WARNING - Recorded pid 137796 does not match the current pid 137817
[2021-12-06, 17:34:11 UTC] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 137817
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat Running <TaskInstance: ExtensionPushStat.mobile_push_stat scheduled__2021-12-05T06:00:00+00:00 [running]> on host airflow-vm-v2
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat Traceback (most recent call last):
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/bin/airflow", line 8, in <module>
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     sys.exit(main())
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/__main__.py", line 48, in main
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     args.func(args)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return func(*args, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return f(*args, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     _run_task_by_selected_method(args, dag, ti)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     _run_raw_task(args, ti)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     ti._run_raw_task(
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return func(*args, session=session, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     self._execute_task_with_callbacks(context)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     result = self._execute_task(context, self.task)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     result = execute_callable(context=context)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/bigquery.py", line 693, in execute
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     job_id = self.hook.run_query(
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 2325, in run_query
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     job = self.insert_job(configuration=configuration, project_id=self.project_id)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 425, in inner_wrapper
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return func(self, *args, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1639, in insert_job
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     job.result()
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1450, in result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     do_get_result()
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1440, in do_get_result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     super(QueryJob, self).result(retry=retry, timeout=timeout)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 727, in result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/future/polling.py", line 130, in result
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     self._blocking_poll(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/cloud/bigquery/job/query.py", line 1199, in _blocking_poll
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/future/polling.py", line 108, in _blocking_poll
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     retry_(self._done_or_raise)(**kwargs)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     return retry_target(
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/google/api_core/retry.py", line 220, in retry_target
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     time.sleep(sleep)
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat   File "/srv/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1413, in signal_handler
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat     raise AirflowException("Task received SIGTERM signal")
[2021-12-06, 17:34:11 UTC] {base_task_runner.py:122} INFO - Job 3235: Subtask mobile_push_stat airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-12-06, 17:34:11 UTC] {process_utils.py:66} INFO - Process psutil.Process(pid=137817, status='terminated', exitcode=1, started='17:33:11') (137817) terminated with exit code 1

更新

这是一个和Bigquery或Google提供商无关的问题。

此问题与的两个配置参数有关

killed_task_cleanup_time-在这种情况下,需要增加长时间运行的作业job_heartbeat_sec-在这种情况下,当您希望清除失败任务的状态并从UI 重新运行时

BigQueryExecuteQueryOperator已弃用(请参阅源代码(。您应该使用BigQueryInsertJobOperator

根据api参数和操作员源代码,语法应为:

mobile_push_stat = BigQueryInsertJobOperator(
task_id="mobile_push_stat",
gcp_conn_id="bigquery_work",
configuration={
"query": "/sql/updater/mobile_push_stat.sql",
"useLegacySql": False,
"timeoutMs": 3600000,
},
)

编辑:注意超时的限制。您选择的3600000值可能过高。

您可以在timeoutMs字段中请求更长的超时时间。但是,不能保证调用等待指定的超时;它通常在大约200秒(200000毫秒(之后返回,即使查询不完整。

最新更新