气流如何从谷歌云平台上的dags主文件夹加载/更新DagBag



请不要否决我的答案。如果需要,我会更新并更正我的文字。我已经做了家庭作业研究。我是一个小新手,所以试着去理解这一点。

我想了解谷歌云平台上的气流是如何从dags主文件夹到UI的变化的。此外,请帮助我的dags设置脚本。我在读书的同时也读了很多答案。图书链接在这里

我试着从第69页找出答案,上面写着

3.11调度和;触发器气流调度程序监视所有任务和所有DAG,并触发其依赖项具有已满足。在幕后,它监视并与它可能包含的所有DAG对象的文件夹,并定期(每分钟左右(检查活动任务,看看它们是否可以已触发

我从这本书中了解到,调度器定期从dags主文件夹中进行更改。(正确吗?(

我还阅读了关于堆栈溢出的多个答案,我发现这个答案很有用链接

但答案仍然不包含从dag主文件夹中的script.py创建/更新dagbag的过程。如何感知变化。

请帮我的dags设置脚本。我们创建了一个通用的python脚本,它通过读取/迭代配置文件来动态创建dag。

下面是目录结构

/dags/workflow/
/dags/workflow/config/dag_a.json
/dags/workflow/config/dag_b.json
/dags/workflow/task_a_with_single_operator.py
/dags/workflow/task_b_with_single_operator.py
/dags/dag_creater.py

执行流dag_creater.py如下:-

1. Iterate in dags/workflow/config folder get the Config JSON file and
read variable dag_id.
2. create Parent_dag = DAG(dag_id=dag_id,
start_date=start_date, schedule_interval=schedule_interval,
default_args=default_args, catchup=False) 
3. Read tasks with dependencies of that dag_id from config json file
(example :- [[a,[]],[b,[a]],[c,[b]]]) and code it as task_a >>
task_b >> task_c

通过这种方式创建dag。一切都很好。dagger在UI上也可见,运行良好。

但问题是,我的dag创作脚本每次都在运行。即使在每个任务日志中,我也会看到所有dag的日志。我希望这个脚本运行一次。只是为了填充元数据中的条目。我不明白为什么它每次都在运行。请让我了解这个过程。

我知道一旦我们第一次设置元数据,airflow initdb就会运行。所以这不是一直在做这个更新。

  • 是调度程序心跳更新全部吗
  • 我的设置正确吗

请注意:我不能键入真实代码,因为这是我的限制组织不过,如果被问到,我会提供更多信息。

Airflow Scheduler实际上是在Airflow运行时环境中连续运行的,它是监视DAG文件夹中的更改并触发驻留在该文件夹中的相关DAG任务的主要贡献者。Airflow Scheduler服务的主要设置可以在airflow.cfg文件中找到,本质上是有效影响常规DAG任务维护的心跳间隔。

然而,特定任务的执行方式是根据气流配置中的执行者模型定义的。

为了存储可用于Airflow运行时环境的DAG,GCP Composer使用云存储,实现特定的文件夹结构,同步到达/dags文件夹的任何对象,如果该对象包含DAG定义,则验证其扩展名为*.py

如果您希望在Airflow运行时内运行DAG扩展脚本,那么在这个特定的用例中,我建议您查看PythonOperator,在单独的DAG中使用它来调用和执行您的自定义通用Python代码,并保证每次只调度一次。您可以查看此Stack线程的实现详细信息。

最新更新