我当前正在使用气流和芹菜处理文件。工人需要下载文件,处理它们并在此之后重新上传。我的dag只有一个工人很好。但是当我添加一件事会变得复杂。
工人在可用的过程中接受任务。Worker1可以将任务"处理下载的文件"执行,但这是将任务"下载文件"的worker2,因此该任务失败了,因为它无法处理不存在的文件。
有没有办法向工人(或调度程序)指定只能在一个工人上运行DAG?我知道队列。但是我已经在使用它们。
在这种情况下,您可以拥有一个空气变量来保存所有工作人员节点名称。例如:
- 变量:
worker_list
- 值:
boxA, boxB, boxC
运行气流工作者时,您可以指定多个作业队列。for ex。: airflow worker job_queue1,job_queue2
对于您的情况,我将运行airflow worker af_<hostname>
在您的DAG代码中,只需获取该worker_list气流变量,随机选择一个盒子,然后将所有工作排队到af_<random_selected_box>
QUEUE