气流DAG在解析时每30秒执行一次代码,而不是每天执行一次



我正试图使一个'插入一个'收集到Apache气流中的mongo DAG。我设置了schedule_interval="@daily",但是DAG每30秒解析一次(没有触发,但是解析)。插入是每30秒执行一次,而不是每天执行一次。在搜索过程中,我发现在web服务器docker容器上,您可以修改'min_file_process_interval'行从'airflow.cfg'文件,但它不成功。

期望的结果是代码每30秒解析一次,每天触发一次。

有类似的SO话题,但没有答案:气流DAG每30秒自动刷新一次

最小可复制示例:

default_args = {
'owner': 'kw',
'retries': 1,
'retry_delay': timedelta(minutes=15),
}
doc_raw = {
'name': "Chaitanya"
}
MONGO_CONN_ID = 'mongo_conn'
MONGO_COLLECTION = 'airflowtest'
def print_hello():
return 'Hello world from first Airflow DAG!'
with DAG(
dag_id='kw_mongo_test4',
default_args=default_args,
start_date=datetime(2022, 6, 27),
schedule_interval="@daily",
description='use case of mongo operator in airflow',
catchup=False) as dag:
task1 = PythonOperator(task_id='hello_task', python_callable=print_hello)
task2 = MongoHook(
task_id="mongo_insert_test",
mongo_conn_id=MONGO_CONN_ID,
mongo_collection=MONGO_COLLECTION,
mongo_db=MONGO_COLLECTION,
default_conn_name = 'mongo_default',
conn_id='mongo_conn',
conn_type = 'mongo_conn',
).insert_one(
mongo_collection = 'airflowtest',
doc=doc_raw,
)

钩子不同于操作符。您正在使用一个钩子,它不会在DAG中注册为任务。我没有看到一个可用的MongoOperator,所以你需要在PythonOperator中运行你的MongoHook命令。

default_args = {
'owner': 'kw',
'retries': 1,
'retry_delay': timedelta(minutes=15),
}
doc_raw = {
'name': "Chaitanya"
}
MONGO_CONN_ID = 'mongo_conn'
MONGO_COLLECTION = 'airflowtest'
def print_hello():
return 'Hello world from first Airflow DAG!'
def insert_to_mongo():
hook = MongoHook(MONGO_CONN_ID)
hook.insert_one(mongo_collection=MONGO_COLLECTION, doc=doc_raw)
with DAG(
dag_id='kw_mongo_test4',
default_args=default_args,
start_date=datetime(2022, 6, 27),
schedule_interval="@daily",
description='use case of mongo operator in airflow',
catchup=False) as dag:
task1 = PythonOperator(task_id='hello_task', python_callable=print_hello)
task2 = PythonOperator(task_id='mongo_insert_test', python_callable=insert_to_mongo)

最新更新