无法使用TriggerDagRunOperator理解气流中有效载荷的概念。请帮助我以非常简单的方式理解这个术语。
TriggerDagRunOperator
会触发指定dag_id
的 DAG 运行。这需要一个类型为string
的trigger_dag_id
和一个python_callable参数,该参数是对 python 函数的引用,该函数将在传递它时调用context
对象和一个占位符对象obj
,以便您的可调用对象填充并返回(如果要创建 DagRun(。此obj
对象包含一个run_id
和payload
属性,您可以在函数中修改该属性。
run_id
应该是该 DAG 运行的唯一标识符,并且有效负载必须是在执行该 DAG 运行时可供任务使用的可选取对象。您的函数标头应如下所示def foo(context, dag_run_obj):
可拾取只是意味着它可以由 pickle 模块序列化。 对此有基本的了解,请参阅可以腌制和不腌制的是什么?。pickle 协议提供了更多详细信息,并显示了类如何自定义该过程。
参考: https://github.com/apache/airflow/blob/d313d8d24b1969be9154b555dd91466a2489e1c7/airflow/operators/dagrun_operator.py#L37
希望这有帮助,我也在努力设置动态有效载荷:
num_runs = 3
runs = [str(uuid.uuid4()) for _ in range(num_runs)]
run_dags = TriggerDagRunOperator.partial(
task_id='test_07_few_opt_ins_triggered_dag',
trigger_dag_id='test_07_few_opt_ins_triggered_dag',
).expand(
trigger_run_id=runs,
conf=[{"line": "1"}, {"line": "2"}, {"line": "3"}]
)
上面我们有 3 次运行,我们需要设置expand
用相同数量的"运行"填充 conf
。然后,在触发的 DAG 中:
@task
def start(dag_run=None):
print(f"consuming line {dag_run.conf.get('line')}")
start()