想要在 Airflow 中使用 TriggerDagRunOperator 来触发许多子匕首,只需使用 Main-dag



无法使用TriggerDagRunOperator理解气流中有效载荷的概念。请帮助我以非常简单的方式理解这个术语。

TriggerDagRunOperator会触发指定dag_id的 DAG 运行。这需要一个类型为stringtrigger_dag_id和一个python_callable参数,该参数是对 python 函数的引用,该函数将在传递它时调用context对象和一个占位符对象obj,以便您的可调用对象填充并返回(如果要创建 DagRun(。此obj对象包含一个run_idpayload属性,您可以在函数中修改该属性。

run_id应该是该 DAG 运行的唯一标识符,并且有效负载必须是在执行该 DAG 运行时可供任务使用的可选取对象。您的函数标头应如下所示def foo(context, dag_run_obj):

可拾取只是意味着它可以由 pickle 模块序列化。 对此有基本的了解,请参阅可以腌制和不腌制的是什么?。pickle 协议提供了更多详细信息,并显示了类如何自定义该过程。

参考: https://github.com/apache/airflow/blob/d313d8d24b1969be9154b555dd91466a2489e1c7/airflow/operators/dagrun_operator.py#L37

希望这有帮助,我也在努力设置动态有效载荷:

num_runs = 3
runs = [str(uuid.uuid4()) for _ in range(num_runs)]
run_dags = TriggerDagRunOperator.partial(
task_id='test_07_few_opt_ins_triggered_dag',
trigger_dag_id='test_07_few_opt_ins_triggered_dag',
).expand(
trigger_run_id=runs,
conf=[{"line": "1"}, {"line": "2"}, {"line": "3"}]
)

上面我们有 3 次运行,我们需要设置expand用相同数量的"运行"填充 conf

。然后,在触发的 DAG 中:

@task
def start(dag_run=None):
print(f"consuming line {dag_run.conf.get('line')}")
start()

最新更新