如何在 Airflow 中向 Python 程序提交参数?



我们有一个前端服务器,它直接使用 dag API 执行 dag(DagBag()get_dag()然后dag_run()

(Dags 运行良好,问题是,我们找不到一种使用特定参数执行此类 dag 的方法。

最接近的解决方案是使用变量 API,它使用set()get()方法,但这些变量是全局的,并且在可能使用相同的变量名称的并发操作中工作时可能会发生冲突。

我们如何运行 dag 并设置可用于其执行的参数?我们主要使用PythonOperator。

编辑1:我们的程序是一个Python Django前端服务器。因此,我们正在通过另一个Python程序与Airflow交谈。这意味着我们通过 Python 触发 dag,因此,使用DagBag.get_dag()从气流服务中检索信息。run_dag()没有办法传递直接参数

如果使用trigger_dag_run(通过命令行或从另一个 dag(触发 dag,则可以将任何 json 作为有效负载传递。

另一种选择是将参数列表存储在文件中,并将该文件位置存储为变量。然后,DAG 可以将此文件位置传递给 python 运算符,然后操作员可以处理读取该文件并从中分析参数。

如果这两种解决方案都不适用于您的用例,则提供有关 dag 和参数类型的更多详细信息可能会有所帮助。

最新更新