升级到Airflow 2后,Cloud Composer上的DataflowTemplateOperator作业失败



我们在当前的composer环境1.16.0中使用气流1.10.15运行了许多DataflowTemplateOperator作业(JDBC到BigQuery模板(。然而,在composer 1.17.6 airflow 2.1.4下尝试运行相同的DAG时,我们得到了以下错误:

[2021-12-07 03:08:56,478] {taskinstance.py:1465} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1166, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
result = task_copy.execute(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/dataflow.py", line 682, in execute
job = self.hook.start_template_dataflow(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 99, in inner_wrapper
raise AirflowException(
airflow.exceptions.AirflowException: The mutually exclusive parameter `location` and `region` key in `variables` parameter are both present. Please remove one.

我们确实在dataflow_default_options中设置了区域参数,它们在气流1下呈现如下:

{'gcpTempLocation': 'gs://us-east1-xxxx/tmp/',
'machineType': 'n2-standard-2',
'project': 'xxxx',
'region': 'us-east1',
'runner': 'DataflowRunner'}

但区域参数似乎无法再通过气流2下的dataflow_default_options进行设置。尝试设置";位置";而不是";区域";无效,作业默认为us-central1。

两个环境都使用相同的模板,这已在Dataflow作业中得到验证。

我们设置区域的原因是因为我们启动了许多数据流任务,如果我们不设置它们,那么cpu配额就会受到影响。我们的us-east1 cpu配额增加了。

任何建议都将不胜感激。

谢谢。

很高兴知道您能够解决问题。我将这个答案留给DataflowTemplateOperator当前版本的社区可见性。如果您认为合适,请随时更新答案。

  • 数据流模板操作员气流V1
  • 数据流模板作业开始操作员气流V2

此外,您还可以在这里找到两个版本的官方DataflowTemplateOperator使用示例。

您可以在操作员级别设置location属性,而不是使用区域:

init_kwargs = {
...
"location": "us-east1",
"dataflow_default_options": {
# "region": "us-east1",
},
...
}
operator = DataflowTemplatedJobStartOperator(**init_kwargs)

或显式硬编码location: None,:

init_kwargs = {
...
"location": None,
"dataflow_default_options": {
"region": "us-east1",
},
...
}
operator = DataflowTemplatedJobStartOperator(**init_kwargs)

附言:修复程序已合并https://github.com/apache/airflow/commit/810b5d4da4396cedcd483d20e50873c2b81cf5ad,可能包含在2.6.1版本或更高版本中。

最新更新