气流——使用金贾的宏变量来创建动态DAG



我需要创建一个动态DAG,默认情况下,DAG变量或(next_execution_date - 1天)周期中的每个日期都有单独的任务(有必要使用DAG的执行日期)。我爸爸的一个例子:

dag_vars = Variable.get("dag_dates", deserialize_json=True) # dag_dates = {"dag_start_dt": "NULL", "dag_end_dt": "NULL"}, but can be different dates
DAG_NAME = "dag_test"
def get_params(vars):
if vars["dag_start_dt"] == "NULL":
start_dt = "{{(next_execution_date - macros.timedelta(days=1))}}"
else:
start_dt = vars["dag_start_dt"]
if vars["dag_end_dt"] == "NULL":
end_dt = "{{ next_execution_date }}"
else:
end_dt = vars["dag_end_dt"]
return start_dt, end_dt
start_dag_params, end_dag_params = get_params(dag_vars)
def get_dag_daterange(start_date, end_date):
for n in range(int((end_date - start_date).days)):
yield start_date + timedelta(n)

dag = DAG(
dag_id=DAG_NAME,
default_args=default_args,
schedule_interval= None,
concurrency=1,
max_active_runs=1,
)
with dag:
start_date, end_date = start_dag_params, end_dag_params
for one_date in get_dag_daterange(start_date, end_date):
task_1 = PostgresOperator(
sql = """CALL test_procedure({l_one_date})""".format(l_one_date=one_date),
task_id = "test_procedure_{l_one_date}".format(l_one_date=str(one_date)),
postgres_conn_id = "xxx",
pool = "pool_test",
dag = dag,
autocommit = True,
)

但是我有一个错误&;不支持的操作数类型-:'str'和'str'&;

我知道原因是在宏({{next_execution_date}})中,这是通过Jinja在运行时解析的,但我不知道如何解决这个问题,我如何使用宏作为气流DAG中的变量。

我很乐意得到任何帮助。谢谢!

不幸的是,没有办法在某些任务的运行时之外访问宏。有两种变通方法:

  1. 尝试在没有运行时的情况下获取数据。你可以得到next_execution_date使用DAG.next_dagrun_info()方法超出运行时。
  2. 在运行时创建。你可以创建一个普通的(非动态的)DAG,其中包含SubDagOperator,在那里你可以根据数据创建和触发(可能)动态创建的DAG,可在运行时访问。

相关内容

  • 没有找到相关文章

最新更新