终止信号给码头操作员气流



我正面临着一个问题与Docker操作气流。标记任务失败后,我需要优雅地退出。气流发送信号。SIGTERM到任务,但是Docker容器中的脚本没有得到SIGTERM,它没有优雅的退出就被杀死了。试图更改' stop_signal '; ' stop_grace_period ';和";init"参数ContainerSpec对象的Docker API,它没有帮助。我如何将SIGTERM或SIGINT与气流转发到Docker容器脚本?

PS:如果我在Docker本地启动这个脚本,然后用以下命令杀死它:'docker kill——signal=INT ....'

选项1:推荐先杀死子进程/进程树,然后再杀死主进程。

对于安全退出,kill the process tree first,然后杀死进程。下面你会看到两个步骤,你可以修改它,或者把它包装成一个基于Airflows kill命令的自定义执行的kill操作符

步骤1

def kill_process_tree(logger, pid, timeout=DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM):
"""
Kills the process's descendants. Kills using the `kill`
shell command so that it can change users. Note: killing via PIDs
has the potential to the wrong process if the process dies and the
PID gets recycled in a narrow time window.
:param logger: logger
:type logger: logging.Logger
:param pid: process id of the root process
:type pid: int
:param timeout: time (seconds) to wait on a process to terminate before
attempting to SIGKILL
:type timeout: int
"""
try:
root_process = psutil.Process(pid)
except psutil.NoSuchProcess:
logger.warning("PID: {} does not exist".format(pid))
return
# Check child processes to reduce cases where a child process died but the PID
# got reused
running_descendants = 
[x for x in root_process.children(recursive=True) if x.is_running()]
if len(running_descendants) != 0:
logger.info("Terminating descendant processes of {} PID: {}"
.format(root_process.cmdline(), root_process.pid))
_, running_descendants = kill_processes(logger, running_descendants,
sig=signal.SIGTERM, timeout=timeout)
else:
logger.debug("There are no descendant processes to terminate.")
logger.info("Terminating root process PID: {}.".format(root_process.pid))
_, running_root = kill_processes(logger, [root_process],
sig=signal.SIGTERM, timeout=timeout)
if running_root or running_descendants:
kill_processes(logger, running_root + running_descendants, sig=signal.SIGKILL)

步骤1 b,现在杀死进程

def kill_processes(logger, processes, sig=signal.SIGTERM, timeout=None):
for p in processes:
try:
if sig == signal.SIGTERM:
logger.info("Terminating process {} PID: {}".format(p.cmdline(), p.pid))
p.terminate()
elif sig == signal.SIGKILL:
logger.info("Killing process {} PID: {}".format(p.cmdline(), p.pid))
p.kill()
except psutil.NoSuchProcess as e:
logger.warning("Process {} no longer exists".format(p.pid))
except Exception as e:
logger.error("Encountered exception while attempting to kill pid "
"{} with signal {}:n{}".format(p.pid, sig, e))
logger.info("Waiting up to {}s for processes to exit...".format(timeout))
return psutil.wait_procs(processes, timeout)

选项2:

杀死容器/工作机器

// From a worker machine
airflow celery stop

最新更新