我正在使用气流进行实验,以取代我们现有的Cron编排,一切看起来都很有前途。我已经成功安装并获得了一个DAG,以安排和执行,但是我注意到它们在我指定的每个任务之间是一个重大延迟(至少15分钟至60分钟(。
> <</p>我的dag定义如下
我错过了一些让他们紧随其后运行的东西吗?
我不使用芹菜调度程序和Web服务器都在同一主机上运行是的 - 需要呼叫远程执行(在此之前使用某种形式的本地执行(并且没有在远程服务器上安装气流DAG应在UTC每天每天运行一次,请遵循我给出的任务的设定路径。
import airflow from builtins import range from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.models import DAG from datetime import datetime, timedelta args = { 'owner': 'user1', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(2), 'email': ['data-etl-errors@user1.com'], 'email_on_failure': True, 'email_on_retry': False, 'wait_for_downstream': True, 'schedule_interval': None, 'depends_on_past': True, 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( dag_id='airflow_pt1' , default_args=args , schedule_interval='0 1 * * *' , dagrun_timeout=timedelta(hours=8)) task1 = BashOperator( task_id='task1' , bash_command='ssh user1@remoteserver /path/to/remote/execution/script_task1.sh' , dag=dag,env=None, output_encoding='utf-8') task2 = BashOperator( task_id='task2' , bash_command='ssh user1@remoteserver /path/to/remote/execution/script_task2.sh' , dag=dag,env=None, output_encoding='utf-8') task3 = BashOperator( task_id='task3' , bash_command='ssh user1@remoteserver /path/to/remote/execution/script_task3.sh' , dag=dag,env=None, output_encoding='utf-8') task4 = BashOperator( task_id='task4' , bash_command='ssh user1@remoteserver /path/to/remote/execution/script_task4.sh' , dag=dag,env=None, output_encoding='utf-8') task2.set_upstream(task1) task3.set_upstream(task1) task4.set_upstream(task2)
注意我尚未执行气流回填(这重要吗?(
找到了问题我尚未将配置从AirFlow中的顺序转换为Localexecutor.cfg文件
我通过https://stlong0521.github.io/20161023 - -20Airflow.html
找到答案。并在https://www.youtube.com/watch?v=pr0frviiftu