用远程Celery worker设置气流



我在本地网络内的虚拟机上设置了Apache Airflow,并希望在我的本地机器上运行一个额外的芹菜工作器,该工作器仍然与airflow系统的其余部分同步。到目前为止,在我在本地机器上启动worker之后,本地机器上存在的dag在web服务器上是不可见的(web服务器在VM上运行),但是在我在本地机器上输入airflow dags reserialize之后,它们就会出现。

这样做后,我在worker日志中得到这些消息:

[2022-06-07 09:54:41,661] {dagbag.py:507} INFO - Filling up the DagBag from /Users/wilbertung/Documents/lowitest/airflow/dags
[2022-06-07 09:54:41,680] {dagbag.py:507} INFO - Filling up the DagBag from None
[2022-06-07 09:54:41,809] {dag.py:2379} INFO - Sync 2 DAGs
[2022-06-07 09:54:41,853] {dag.py:2923} INFO - Setting next_dagrun for ChiSo to 2022-06-06T01:54:41.852752+00:00, run_after=2022-06-07T01:54:41.852752+00:00
[2022-06-07 09:54:41,853] {dag.py:2923} INFO - Setting next_dagrun for lowi17 to 2022-06-06T16:00:00+00:00, run_after=2022-06-07T16:00:00+00:00

然后,在调度程序日志中我得到以下消息:

[2022-06-07 09:54:42,473] {scheduler_job.py:353} INFO - 3 tasks up for execution:
<TaskInstance: lowi17.台灣醒報 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
<TaskInstance: lowi17.台灣新生報 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
<TaskInstance: lowi17.華視新聞網 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
[2022-06-07 09:54:42,473] {scheduler_job.py:418} INFO - DAG lowi17 has 0/16 running and queued tasks
[2022-06-07 09:54:42,473] {scheduler_job.py:418} INFO - DAG lowi17 has 1/16 running and queued tasks
[2022-06-07 09:54:42,473] {scheduler_job.py:418} INFO - DAG lowi17 has 2/16 running and queued tasks
[2022-06-07 09:54:42,473] {scheduler_job.py:504} INFO - Setting the following tasks to queued state:
<TaskInstance: lowi17.台灣醒報 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
<TaskInstance: lowi17.台灣新生報 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
<TaskInstance: lowi17.華視新聞網 manual__2022-06-06T06:00:03.787848+00:00 [scheduled]>
[2022-06-07 09:54:42,476] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='lowi17', task_id='台灣醒報', run_id='manual__2022-06-06T06:00:03.787848+00:00', try_number=3, map_index=-1) to executor with priority 1 and queue default
[2022-06-07 09:54:42,476] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'lowi17', '台灣醒報', 'manual__2022-06-06T06:00:03.787848+00:00', '--local', '--subdir', '/Users/wilbertung/Documents/lowitest/airflow/dags/DAG_lowi50.py']
[2022-06-07 09:54:42,477] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='lowi17', task_id='台灣新生報', run_id='manual__2022-06-06T06:00:03.787848+00:00', try_number=3, map_index=-1) to executor with priority 1 and queue default
[2022-06-07 09:54:42,477] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'lowi17', '台灣新生報', 'manual__2022-06-06T06:00:03.787848+00:00', '--local', '--subdir', '/Users/wilbertung/Documents/lowitest/airflow/dags/DAG_lowi50.py']
[2022-06-07 09:54:42,477] {scheduler_job.py:546} INFO - Sending TaskInstanceKey(dag_id='lowi17', task_id='華視新聞網', run_id='manual__2022-06-06T06:00:03.787848+00:00', try_number=3, map_index=-1) to executor with priority 1 and queue default
[2022-06-07 09:54:42,477] {base_executor.py:91} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'lowi17', '華視新聞網', 'manual__2022-06-06T06:00:03.787848+00:00', '--local', '--subdir', '/Users/wilbertung/Documents/lowitest/airflow/dags/DAG_lowi50.py']
[2022-06-07 09:54:42,621] {scheduler_job.py:599} INFO - Executor reports execution of lowi17.台灣醒報 run_id=manual__2022-06-06T06:00:03.787848+00:00 exited with status failed for try_number 3
[2022-06-07 09:54:42,621] {scheduler_job.py:599} INFO - Executor reports execution of lowi17.台灣新生報 run_id=manual__2022-06-06T06:00:03.787848+00:00 exited with status failed for try_number 3
[2022-06-07 09:54:42,621] {scheduler_job.py:599} INFO - Executor reports execution of lowi17.華視新聞網 run_id=manual__2022-06-06T06:00:03.787848+00:00 exited with status failed for try_number 3
[2022-06-07 09:54:42,626] {scheduler_job.py:643} INFO - TaskInstance Finished: dag_id=lowi17, task_id=台灣醒報, run_id=manual__2022-06-06T06:00:03.787848+00:00, map_index=-1, run_start_date=2022-06-06 06:00:06.678844+00:00, run_end_date=2022-06-06 06:51:33.138733+00:00, run_duration=3086.459889, state=queued, executor_state=failed, try_number=3, max_tries=2, job_id=83, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2022-06-07 01:54:42.474017+00:00, queued_by_job_id=100, pid=31538
[2022-06-07 09:54:42,627] {scheduler_job.py:672} ERROR - Executor reports task instance <TaskInstance: lowi17.台灣醒報 manual__2022-06-06T06:00:03.787848+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-06-07 09:54:42,639] {scheduler_job.py:643} INFO - TaskInstance Finished: dag_id=lowi17, task_id=台灣新生報, run_id=manual__2022-06-06T06:00:03.787848+00:00, map_index=-1, run_start_date=2022-06-06 06:00:06.005933+00:00, run_end_date=2022-06-06 06:51:33.156305+00:00, run_duration=3087.150372, state=queued, executor_state=failed, try_number=3, max_tries=2, job_id=85, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2022-06-07 01:54:42.474017+00:00, queued_by_job_id=100, pid=31535
[2022-06-07 09:54:42,639] {scheduler_job.py:672} ERROR - Executor reports task instance <TaskInstance: lowi17.台灣新生報 manual__2022-06-06T06:00:03.787848+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-06-07 09:54:42,645] {scheduler_job.py:643} INFO - TaskInstance Finished: dag_id=lowi17, task_id=華視新聞網, run_id=manual__2022-06-06T06:00:03.787848+00:00, map_index=-1, run_start_date=None, run_end_date=2022-06-06 06:51:33.162201+00:00, run_duration=None, state=queued, executor_state=failed, try_number=3, max_tries=2, job_id=None, pool=default_pool, queue=default, priority_weight=1, operator=BashOperator, queued_dttm=2022-06-07 01:54:42.474017+00:00, queued_by_job_id=100, pid=None
[2022-06-07 09:54:42,645] {scheduler_job.py:672} ERROR - Executor reports task instance <TaskInstance: lowi17.華視新聞網 manual__2022-06-06T06:00:03.787848+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2022-06-07 09:54:42,672] {dagrun.py:547} ERROR - Marking run <DagRun lowi17 @ 2022-06-06 06:00:03.787848+00:00: manual__2022-06-06T06:00:03.787848+00:00, externally triggered: True> failed
[2022-06-07 09:54:42,672] {dagrun.py:607} INFO - DagRun Finished: dag_id=lowi17, execution_date=2022-06-06 06:00:03.787848+00:00, run_id=manual__2022-06-06T06:00:03.787848+00:00, run_start_date=2022-06-06 06:00:03.844994+00:00, run_end_date=2022-06-07 01:54:42.672853+00:00, run_duration=71678.827859, state=failed, external_trigger=True, run_type=manual, data_interval_start=2022-06-05 06:00:03.787848+00:00, data_interval_end=2022-06-06 06:00:03.787848+00:00, dag_hash=7f2d9c074e59bc29ace385f688864720
[2022-06-07 09:54:42,675] {dag.py:2923} INFO - Setting next_dagrun for lowi17 to 2022-06-06T06:00:03.787848+00:00, run_after=2022-06-07T06:00:03.787848+00:00

在这一刻之后,DAG在web服务器上变得不可见,就好像它从未存在过一样…我确信我漏掉了一些重要的配置。如果有,是哪一个?

基本上,即使有一种方法可以将DAG文件放入不同的绝对但相同的相对文件夹中并使其工作,我所采用的最常见和直接的方法是将共享文件夹挂载到主节点和远程工作人员,以便他们都可以访问相同的DAG文件夹。

更多细节可以在这里找到:https://github.com/apache/airflow/discussions/24275

最新更新