下面是我的简单dag/python脚本,它位于Google Cloud Bucke上的DAGS文件夹中。
from airflow import DAG
import airflow
from airflow.operators import BashOperator
from datetime import datetime,timedelta , date
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from generate_csv_feeds import generate_csv
DEFAULT_DAG_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email_on_failure': False,
'schedule_interval':'*/5 * * * *'
}
with DAG('DAG_MAIN',default_args=DEFAULT_DAG_ARGS,catchup=False) as dag:
generate_csv = PythonOperator(
task_id='generate_mktg_csv',
python_callable=generate_csv,
op_args=['get_data.sql','feeds_data_airflow.csv']
)
csv_generated = BashOperator(
task_id='csv_generated',
bash_command='echo CSV Generated Succesfully.')
generate_csv >> csv_generated
问题是,如果我通过命令行触发它,它根本不会自动触发。但是奇怪的是,当我从气流UI运行它时,它起作用。我需要一次每5分钟运行一次。我不确定这是否与Google Composer有关。任何帮助,将不胜感激 。预先感谢
我认为这是由于您的start_date
是datetime.utcnow()
。不建议使用移动start_date
,尤其是datetime.utcnow()
,因为DAG在start_date + schedule_interval
处触发,并且随着start_date
的移动,从未触发DAG。请参阅FAQ https://airflow.apache.org/faq.html#what s-the-the-deal-with-with-with-date。
尝试使用固定的start_date
,例如datetime(2019, 08, 04)
。