气流与KubernetesExecutor:记录的pid不匹配当前pid. Airflow . getting Airf



我在KubernetesExecutor上运行气流:2.1.4与Kubernetes: 1.21.0

get AirflowException: Task received SIGTERM signal

堆栈跟踪:

[2021-10-11 06:22:52,543] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 17
[2021-10-11 06:22:52,543] {taskinstance.py:1236} **ERROR - Received SIGTERM. Terminating subprocesses.**
[2021-10-11 06:22:52,560] {taskinstance.py:1463} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1165, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1283, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1313, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/a.py", line 12, in test
time.sleep(5)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1238, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

当任务接收到heartbeat_callback(根据气流配置中的job_heartbeat_sec)时,记录的pid和当前pid不匹配,因为执行了这部分代码。https://github.com/apache/airflow/blob/main/airflow/jobs/local_task_job.py L201-L203

if ti.run_as_user or self.task_runner.run_as_user:
recorded_pid = psutil.Process(ti.pid).ppid()
same_process = recorded_pid == current_pid

我尝试通过更改源代码来打印run_as_user的值:

ti.run_as_user =无Self.task_runner.run_as_user =气流

我已经尝试改变job_heartbeat_sec值,任务总是在那一点上失败。我正在运行调度程序,web服务器作为气流用户。气流使用者也可以触发dag。我已经尝试将DAG定义中的run_as_user设置为None或气流,但同样的错误。

从Docker文件创建气流用户:

RUN chown -R airflow: /usr/local/bin/airflow
RUN chown -R airflow: /root
USER airflow
WORKDIR ${AIRFLOW_HOME}

气流配置

[scheduler]
job_heartbeat_sec = 30
clean_tis_without_dagrun_interval = 15.0
scheduler_heartbeat_sec = 10
num_runs = -1
processor_poll_interval = 1
min_file_process_interval = 0
dag_dir_list_interval = 300
print_stats_interval = 30
pool_metrics_interval = 5.0
scheduler_health_check_threshold = 295
orphaned_tasks_check_interval = 305.0
child_process_log_directory = /root/airflow/logs/scheduler
scheduler_zombie_task_threshold = 300
catchup_by_default = True
max_tis_per_query = 512
use_row_level_locking = True
max_dagruns_to_create_per_loop = 10
max_dagruns_per_loop_to_schedule = 20
schedule_after_task_execution = False
parsing_processes = 2
file_parsing_sort_mode = modified_time
use_job_schedule = True
allow_trigger_in_future = False
dependency_detector = airflow.serialization.serialized_objects.DependencyDetector
run_duration = -1
max_threads = 2
authenticate = False

[kubernetes]
pod_template_file = /usr/local/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = ************************
worker_container_tag = ****************************
namespace = default
delete_worker_pods = False
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = False
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
verify_ssl = True
worker_pods_pending_timeout = 300
worker_pods_pending_timeout_check_interval = 120
worker_pods_pending_timeout_batch_size = 100
airflow_configmap = airflow-configmap
airflow_local_settings_configmap = airflow-configmap

应该会在2.2.3中修复

这可能发生在任何版本的风流的原因之一是因为在任何DAG上设置重试。在创建DAG的参数:

'retries': 1,
'retry_delay': timedelta(minutes=5)

表示当任务第一个实例运行了5分钟且未完成时,启动下一个实例。(气流称之为任务尝试)

在我的例子中,我在DAG任务中运行一个docker容器。容器是一个长时间运行的作业,在5分钟(retry_delay)后失败,错误:

[2023-06-02, 13:12:43 UTC] {local_task_job_runner.py:271} WARNING - Recorded pid 2038103 does not match the current pid 2036364
[2023-06-02, 13:12:43 UTC] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 2036364. PIDs of all processes in the group: [2036364]
[2023-06-02, 13:12:43 UTC] {process_utils.py:86} INFO - Sending the signal Signals.SIGTERM to group 2036364
[2023-06-02, 13:12:43 UTC] {taskinstance.py:1517} ERROR - Received SIGTERM. Terminating subprocesses.
[2023-06-02, 13:12:43 UTC] {docker.py:495} INFO - Stopping docker container
[2023-06-02, 13:12:46 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/airflow/providers/docker/operators/docker.py", line 478, in execute
return self._run_image()
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/airflow/providers/docker/operators/docker.py", line 339, in _run_image
return self._run_image_with_mounts(self.mounts + [tmp_mount], add_tmp_variable=True)
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/airflow/providers/docker/operators/docker.py", line 400, in _run_image_with_mounts
for log_chunk in logstream:
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/docker/types/daemon.py", line 29, in __next__
return next(self._stream)
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/docker/api/client.py", line 422, in <genexpr>
gen = (data for (_, data) in gen)
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/docker/utils/socket.py", line 113, in frames_iter_no_tty
(stream, n) = next_frame_header(socket)
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/docker/utils/socket.py", line 85, in next_frame_header
data = read_exactly(socket, 8)
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/docker/utils/socket.py", line 70, in read_exactly
next_data = read(socket, n - len(data))
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/docker/utils/socket.py", line 41, in read
poll.poll()
File "/home/ubuntu/work-nisha/airflow-poc/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1519, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2023-06-02, 13:12:46 UTC] {taskinstance.py:1345} INFO - Marking task as FAILED. dag_id=s000_docker1, task_id=docker_command, execution_date=20230602T130758, start_date=20230602T131226, end_date=20230602T131246
[2023-06-02, 13:12:46 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 613 for task docker_command (Task received SIGTERM signal; 2036364)

比较第一次和第二次尝试的任务实例PID,是匹配的。Recorded pid 2038103 does not match the current pid 2036364

第一次尝试PID 2036364,第二次尝试PID 2038103。

下一个尝试导致第一次长进程尝试失败。由于气流杀死了组中的所有进程,所以第二次尝试也失败了。

参见气流代码:https://github.com/apache/airflow/blob/dc5bf3fd02c589578209cb0dd5b7d005b1516ae9/airflow/jobs/local_task_job_runner.py#L270

解决方案:

如果是长时间运行的任务,在DAG中不添加retry,或者将retry_delay添加到任务实例期望完成的值中

最新更新