在气流中动态执行任务



考虑我有一个气流 DAG 包含三个任务 t1,t2,t3。发展议程集团的流程将是t1>>t2>>t3. 我想找到一种动态更改启动任务的方法。如果我在运行时给出 t1,DAG 应该从 t1 触发。如果我给出 t2,则应跳过 t1,DAG 执行应从 t2 开始。有没有办法在气流中做到这一点?

有一种方法,但不是你描述的那样。 添加t0作为BranchPythonOperator,这将决定是继续t1还是跳过它并继续t2

def choose(ti):
if something:
return 't1'
return 't2'
t0 = BranchPythonOperator(
task_id='t0',
python_callable=choose)
t1 = DummyOpeartor(task_id='t1')
t2 = DummyOpeartor(task_id='t2', trigger_rule='one_success')
t3 = DummyOpeartor(task_id='t3')
t0 >> [t1, t2] >> t3
t1 >> t2

最新更新