在Airflow上的自定义Python操作符中访问dag_run.conf



我在Airflow上扩展了现有的PythonOperator,如下所示:

class myPythonOperator(PythonOperator):
def __init__(self,**kwargs) -> None:
self.name = kwargs.get("name", "name is not provided")      
def execute(self, context,**kwargs):
print(self.name)
super(myPythonOperator, self).execute(context)

我的任务被定义为:

def task1(**kwargs):
name = kwargs.get("name", "name is not provided")
print(name)

使用以下DAG:

myTask = myPythonOperator(
task_id='myTask',
python_callable = task1,
op_kwargs={"name": "{{ dag_run.conf['name'] }}"},
provide_context=True
)

当触发DAG时,我从Airflow web UI提供了一个配置JSON,它是{"name":"foo"}

但问题是JSON中指定的名称只能从task1访问,在ececute()中它将始终打印name is not provided

有人知道从操作员的__init__()函数访问这个dag_run.conf的技巧吗?

任何帮助都将不胜感激。感谢

从继承类访问dag.run_config的方法是使用Airflow中的template_field,可以在中找到

最新更新