气流:呼叫任务组的问题



我在调用TaskGroups时遇到问题,错误日志认为我的作业id是avg_speed_20220502_22c11bdf而不仅仅是avg_speed,我不知道为什么。

下面是我的代码:

with DAG(
'debug_bigquery_data_analytics',
catchup=False,
default_args=default_arguments) as dag:
# Note to self: the bucket region and the dataproc cluster should be in the same region
create_cluster = DataprocCreateClusterOperator(
task_id='create_cluster',
...
)
with TaskGroup(group_id='weekday_analytics') as weekday_analytics:
avg_temperature = DummyOperator(task_id='avg_temperature')
avg_tire_pressure = DummyOperator(task_id='avg_tire_pressure')
avg_speed = DataprocSubmitPySparkJobOperator(
task_id='avg_speed',
project_id='...',
main=f'gs://.../.../avg_speed.py',
cluster_name=f'spark-cluster-{{ ds_nodash }}',
region='...',
dataproc_jars=['gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar'],
)
avg_temperature >> avg_tire_pressure >> avg_speed
delete_cluster = DataprocDeleteClusterOperator(
task_id='delete_cluster',
project_id='...',
cluster_name='spark-cluster-{{ ds_nodash }}',
region='...',
trigger_rule='all_done',
)
create_cluster >> weekday_analytics >> delete_cluster

下面是我得到的错误信息:

google.api_core.exceptions.InvalidArgument: 400 Job id 'weekday_analytics.avg_speed_20220502_22c11bdf' must conform to '[a-zA-Z0-9]([a-zA-Z0-9-_]{0,98}[a-zA-Z0-9])?' pattern
[2022-05-02, 11:46:11 UTC] {taskinstance.py:1278} INFO - Marking task as FAILED. dag_id=debug_bigquery_data_analytics, task_id=weekday_analytics.avg_speed, execution_date=20220502T184410, start_date=20220502T184610, end_date=20220502T184611
[2022-05-02, 11:46:11 UTC] {standard_task_runner.py:93} ERROR - Failed to execute job 549 for task weekday_analytics.avg_speed (400 Job id 'weekday_analytics.avg_speed_20220502_22c11bdf' must conform to '[a-zA-Z0-9]([a-zA-Z0-9-_]{0,98}[a-zA-Z0-9])?' pattern; 18116)
[2022-05-02, 11:46:11 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-05-02, 11:46:11 UTC] {local_task_job.py:264} INFO - 1 downstream tasks scheduled from follow-on schedule check

气流任务标识为task_id。然而,当使用TaskGroups时,您可以在不同的组中使用相同的task_id,因此任务组中定义的任务具有group_id.task_id的标识符。

Forapache-airflow-providers-google>7.0.0:

该错误已被修复。现在应该可以了。

Forapache-airflow-providers-google<=7.0.0:

你有问题,因为DataprocJobBaseOperator有:

:param job_name: The job name used in the DataProc cluster. This name by default
is the task_id appended with the execution data, but can be templated. The
name will always be appended with a random number to avoid name clashes.

问题是,气流添加.字符和谷歌不接受它,因此要解决你的问题,你必须覆盖默认的job_name参数到你选择的字符串。如果您愿意,您可以将其设置为task_id

我打开https://github.com/apache/airflow/issues/23439报告这个bug,同时你可以按照上面的建议。

最新更新