REST参数下气流中的动态任务



我触发气流DAG并传递REST参数。在REST参数列表中,我想重复这个DAG中的一些任务。经过几次尝试,我被卡住了,我不确定这是否可能。试试看:

def determine_rest_params(**kwargs):
values_comma_sep = kwargs["dag_run"].conf["myparam"]
values= []
if values_comma_sep :
values= values_comma_sep .split(",")
return values

def create_task_for_param(p, **kwargs)
# create an operator instance
with airflow.DAG("get_prediction2", default_args=default_args, schedule_interval=None) as dag:

start = DummyOperator(
task_id='start',
dag=dag
)
params = determine_rest_params()
for cur_p in params:
cur_task = create_task_for_param(cur_p)
start >> cur_task

我只看到启动任务,没有看到其他操作员。一般来说可能吗?当做Oli

您可以尝试以下操作:(不确定是否有效(将for循环更改为:

for i in range(0, len(params):
params[i] = create_task_for_param(params[i])
if i == 0:
start.set_downstream(params[i])
else:
params[i-1].set_downstream(parmas[i])

我也不确定你是否得到了正确的方式。打印出来,看看你是否得到参数。如果是,那么上面的for循环应该有效。如果没有,你可以尝试在启动任务中获取params。