我刚刚用芹菜执行器设置了气流,这是我的DAG的骨架
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t1)
我有2名工人。其中一个只运行一个名为local_queue
的队列,另一个运行两个名为local_queue,test_queue
的队列
我只想在 1 台计算机上运行任务1,但在两台计算机上运行任务 2 和 3。 即在仅运行 local_queue 的工作线程 1 上,T2 和 T3 应该运行,在同时运行 local_queue 和test_queue的工作线程 2 上,所有 3 个(t1、t2 和 t3)都应该运行。 任务运行的总数应为 5。
但是,当我运行它时,只运行 3 个任务。 1) print_date为工作线程 2 运行(这是正确的) 2) print_host仅针对工作人员 1 运行(不正确。应该为两个工人运行)和 3) print_uptime 仅针对工作线程 2 运行(也不正确。应该为两个工人运行)
您能否指导我如何设置它,以便运行 5 个任务。在生产中,我想通过将机器分组到队列中来管理机器,对于所有具有 QUEUE_A -> do X 的机器和所有具有 QUEUE_B -> do Y 等的机器。
谢谢
与其让一个工作人员工作 2 个队列,不如让每个工作人员工作一个队列。 因此,辅助角色命令应如下所示:
airflow worker -q test_queue
airflow worker -q local_queue
然后有两个相同的任务,但在不同的 queus 中。
dag = DAG('dummy_for_testing', default_args=default_args)
t1 = BashOperator(
task_id='print_date',
bash_command='date >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t3_2 = BashOperator(
task_id='print_host_2',
bash_command='hostname >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2 = BashOperator(
task_id='print_uptime',
bash_command='uptime >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
t2_2 = BashOperator(
task_id='print_uptime_2',
bash_command='uptime >> /tmp/dag_output.log',
queue='test_queue',
dag=dag)
t2.set_upstream(t3)
t2.set_upstream(t3_2)
t2.set_upstream(t1)
t2_2.set_upstream(t3)
t2_2.set_upstream(t3_2)
t2_2.set_upstream(t1)