我在Airflow上扩展了现有的PythonOperator,如下所示:
class myPythonOperator(PythonOperator):
def __init__(self,**kwargs) -> None:
self.name = kwargs.get("name", "name is not provided")
def execute(self, context,**kwargs):
print(self.name)
super(myPythonOperator, self).execute(context)
我的任务被定义为:
def task1(**kwargs):
name = kwargs.get("name", "name is not provided")
print(name)
使用以下DAG:
myTask = myPythonOperator(
task_id='myTask',
python_callable = task1,
op_kwargs={"name": "{{ dag_run.conf['name'] }}"},
provide_context=True
)
当触发DAG时,我从Airflow web UI提供了一个配置JSON,它是{"name":"foo"}
但问题是JSON中指定的名称只能从task1访问,在ececute()
中它将始终打印name is not provided
有人知道从操作员的__init__()
函数访问这个dag_run.conf
的技巧吗?
任何帮助都将不胜感激。感谢
从继承类访问dag.run_config的方法是使用Airflow中的template_field,可以在中找到