如果工作人员本身发生故障,运行在Celery工作人员身上的气流传感器会发生什么



用例:DAG定义了一个Rest API任务(使用RestOperator(,该任务命中应用程序API并开始执行执行执行某些业务功能的流程/任务。它的执行状态由气流传感器监测,气流传感器通过API调用轮询任务执行完成状态。

问题:

  1. 如果Celery节点发生故障,在该节点上运行的Sensor会发生什么
  2. 如果传感器与工人一起死亡,如何将传感器执行传播到另一个节点(以避免功能损失(
from datetime import datetime, timedelta
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http import SimpleHttpOperator
default_args = {
'start_date': datetime.today().strftime('%Y-%m-%d'),
'end_date': None
}
dag = DAG(
'Rest Monitor',
default_args=default_args,
schedule_interval=None,
catchup=False)
HttpOperator = SimpleHttpOperator(
task_id='RestOperator',
method='POST',
endpoint='https://localhost:8080/api/task/execute',
headers={
"Content-Type": "application/json"
},
data={
"taskId": "1234",
}
},
dag = dag )
def resp_check():
return "True if Status = Success"
HttpSensor = HttpSensor(
dag = dag,
task_id = 'http_sensor_head_method',
http_conn_id = 'http_default',
endpoint = 'https://localhost:8080/api/task/1234/status',
request_params = {},
method = 'HEAD',
response_check = resp_check,
timeout = 5,
poke_interval = 1)

HttpOperator >> HttpSensor

此逻辑表面上由Airflow核心及其通过Scheduler协调工作人员进行管理。基本上,调度程序定期轮询或从工作节点发送任务状态/日志数据。

从本质上讲,Scheduler明确地管理来自工作人员的数据以及工作人员应该管理的任务的数据。如果工作人员没有响应,或者没有返回足够的有关任务的数据。计划程序将开始它的关闭任务和失败尝试逻辑。

如果该任务重试次数更多,则会将该任务的新副本添加到队列中,以便群集中的任何活动工作人员拾取。

相关内容

最新更新