我试图设置一个气流DAG,提供dag_run.conf
的默认值。当从web运行DAG时,使用"Run w/Config"选择。但是,当在计划上运行时,dag_run.conf
字典不存在,任务将失败,例如
jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'
下面是一个示例作业。
是否有可能使dag_run.conf
总是包含params
在这里定义的字典?
from airflow import DAG
from airflow.utils.dates import hours_ago
from airflow.operators.bash import BashOperator
from datetime import timedelta
def do_something(val1: str, val2: str) -> str:
return f'echo vars are: "{val1}, {val2}"'
params = {
'key1': 'def1',
'key2': 'def2',
}
default_args = {
'retries': 0,
}
with DAG(
'template_test',
default_args=default_args,
schedule_interval=timedelta(minutes=1),
start_date=hours_ago(1),
params = params,
) as dag:
hello_t = BashOperator(
task_id='example-command',
bash_command=do_something('{{dag_run.conf["key1"]}}', '{{dag_run.conf["key2"]}}'),
config=params,
)
我所见过的最接近的是在Apache气流,我如何通过CLI手动触发DAG时传递参数?但是,它们利用了Jinja和if/else——这将需要定义两次这些默认参数。我想只定义它们一次。
您可以使用DAG参数来实现您正在寻找的内容:
params (dict) - DAG级参数的字典,可以在模板中访问,命名空间位于params下。可以在任务级别重写这些参数。
您可以在DAG或Task级别定义params
,也可以在Trigger DAG w/config部分的UI中添加或修改它们。
DAG例子:
default_args = {
"owner": "airflow",
}
dag = DAG(
dag_id="example_dag_params",
default_args=default_args,
schedule_interval="*/5 * * * *",
start_date=days_ago(1),
params={"param1": "first_param"},
catchup=False,
)
with dag:
bash_task = BashOperator(
task_id="bash_task", bash_command="echo bash_task: {{ params.param1 }}"
)
输出日志:
[2021-10-02 20:23:25,808] {logging_mixin.py:104} INFO - Running <TaskInstance: example_dag_params.bash_task 2021-10-02T23:15:00+00:00 [running]> on host worker_01
[2021-10-02 20:23:25,867] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_dag_params
AIRFLOW_CTX_TASK_ID=bash_task
AIRFLOW_CTX_EXECUTION_DATE=2021-10-02T23:15:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-10-02T23:15:00+00:00
[2021-10-02 20:23:25,870] {subprocess.py:52} INFO - Tmp dir root location:
/tmp
[2021-10-02 20:23:25,871] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo bash_task: first_param']
[2021-10-02 20:23:25,884] {subprocess.py:74} INFO - Output:
[2021-10-02 20:23:25,886] {subprocess.py:78} INFO - bash_task: first_param
[2021-10-02 20:23:25,887] {subprocess.py:82} INFO - Command exited with return code 0
从日志中,注意到dag_run
是调度的,参数仍然在那里。
你可以在这个回答中找到一个关于使用参数的更广泛的例子。
希望这对你有用!