线程已完成的进程永远不会退出



为什么线程要保持并阻止其进程退出,即使在完成目标之后也是如此

虽然这个问题使用了一个额外的子进程,但底层问题完全源于多线程。因此,这个基本问题可以单独用MainProcess来再现。(由@Darkonaut编辑)

我创建了一个继承multiprocessing.Process:的类

class Task(Process):
def run(self) :
print("RUN")
t = threading.Thread(target=do_some_work)
t.start()
# ...
t.join()
print("CLOSED")

我是这样开始的:

proc = Task()
proc.start()
proc.join()
print("JOINED")

但它不会加入,输出将是这样的:

>> RUN
>> CLOSED

我没有使用任何类型的QueuesPipes

当我在Ubuntu上运行这个程序时,我用它的pid跟踪进程。即使在print("CLOSED")行完成之后,该过程仍然存在,没有任何例外。我还在Windows上运行了这个,并在任务管理器中跟踪了这个过程。进程在print("CLOSED")之后退出,并且它仍然没有加入。

另一点是,在Ubuntu上,当print("CLOSED")之后一切都被卡住,我按下Ctrl + C时,我得到的是:

Traceback (most recent call last):
File "Scheduler.py", line 164, in <module>
scheduler.start()
File "Scheduler.py", line 152, in start
self.enqueueTask(plan)
File "Scheduler.py", line 134, in enqueueTask
proc.join()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/usr/local/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
pid, sts = os.waitpid(self.pid, flag)

根据最后一行,我想主要的过程是在等待什么,但什么以及为什么?

问题似乎出在我在Taskrun()方法中启动的一个非守护进程线程上让这个线程成为守护进程线程解决了这个问题,所以我可以肯定地说,即使在MainThread完成后,这个线程也会阻止我的进程关闭。我仍然感到困惑,因为那个非守护进程线程的目标函数已经成功完成。

为什么线程要持久存在并阻止其进程退出,即使在完成目标之后也是如此

虽然这个问题使用了额外的子进程,但底层问题完全源于多线程。因此,这个基本问题可以用MainProcess单独再现。在编辑2中可以找到涉及额外子进程的答案。


场景

在没有看到您的子进程中的新线程实际在做什么的情况下,您观察到的行为的一种可能情况是您的thread-1正在启动另一个thread-2,您可能甚至不知道。它可能是从您正在调用的第三方库启动的,或者为了留在stdlib中,multiprocessing.Queue.put()还会在后台启动一个馈送线程。

这个通用场景不是Process子类化问题,也与从子进程本身调用Process.close()无关(使用不正确,但没有后果)。

进程中的MainThread始终是进程中退出的最后一个线程,并且作为其_shutdown()例程的一部分,它正在加入非守护进程线程。这就是MainThread在其"表面"工作已经完成的情况下保持不稳定状态的原因。

问题是我在Task的run()方法中启动的一个非守护进程线程。所以我可以肯定地说,线程正在阻止我的进程关闭,即使在它的MainThread完成之后也是如此。但我仍然感到困惑,因为那个非守护进程线程的目标函数已经成功完成。

现在,在图中的场景中,thread-1的目标函数实际上可以成功完成。然而,这个thread-1启动了另一个thread-2,然后它会做一些持续很长时间的事情,比如在最坏的情况下永远阻止。

Q:如果thread-1本身不是问题,为什么当你把thread-1变成daemon时没有挂起?

这是因为守护进程标志的"初始值是从创建线程继承的"。因此,使thread-1成为daemon,也使其子代thread-2成为daemon,除非明确设置了thread-2daemon-标志。守护进程在关闭时不会加入,整个进程"在没有活动的非守护进程线程时退出"。

请注意,在Python 3.7之前,Process创建的非守护进程线程已加入而未加入。对于MainProcess以外的线程,这种发散行为已在bpo-18966中得到固定。


代码

为了显示这种场景已经可以通过更简单的设置进行复制,下面的示例使用MainProcess作为不会退出的进程。这里的thread-2是一个Timer线程,它将在10秒后启动并调用threading.Barrier(parties=1).wait()。然后,此.wait()调用将立即用parties=1完成,或者用parties=2永远阻止,因为在我们的设置中不存在在此Barrier上调用.wait()的其他方。这使得我们可以很容易地切换想要复制的行为。

import threading
def blackbox(parties):
"""Dummy for starting thread we might not know about."""
timer = threading.Timer(10, threading.Barrier(parties=parties).wait)  # Thread-2
timer.name = "TimerThread"
timer.start()

def t1_target(parties):  # Thread-1
"""Start another thread and exit without joining."""
logger = get_mp_logger()
logger.info(f"ALIVE: {[t.name for t in threading.enumerate()]}")
blackbox(parties)
logger.info(f"ALIVE: {[t.name for t in threading.enumerate()]}")
logger.info("DONE")

if __name__ == '__main__':
import logging
parties = 1
daemon = False
print(f"parties={parties}, daemon={daemon}")
logger = get_mp_logger(logging.INFO)
logger.info(f"ALIVE: {[t.name for t in threading.enumerate()]}")
t = threading.Thread(target=t1_target, args=(parties,), daemon=daemon)
t.start()
t.join()
logger.info(f"ALIVE: {[t.name for t in threading.enumerate()]}")    
logger.info("DONE")

下面的日志是针对parties=1的,因此不存在无限阻塞,但由于thread-2不是守护进程线程,MainThread将在关闭时加入它。注意,在t1_target完成之后,TimerThread仍然是活动的。这里主要感兴趣的是MainThread"DONE""process shutting down"需要大约10秒。这是TimerThread还活着的10秒。

parties=1, daemon=False
[18:04:31,977 MainThread <module>] ALIVE: ['MainThread']
[18:04:31,977 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1']
[18:04:31,978 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1', 'TimerThread']
[18:04:31,978 Thread-1 t1_target] DONE
[18:04:31,978 MainThread <module>] ALIVE: ['MainThread', 'TimerThread']
[18:04:31,978 MainThread <module>] DONE
[18:04:41,978 MainThread info] process shutting down
Process finished with exit code 0

有了parties=2,它永远挂在这个阶段,。。。

parties=2, daemon=False
[18:05:06,010 MainThread <module>] ALIVE: ['MainThread']
[18:05:06,010 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1']
[18:05:06,011 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1', 'TimerThread']
[18:05:06,011 Thread-1 t1_target] DONE
[18:05:06,011 MainThread <module>] ALIVE: ['MainThread', 'TimerThread']
[18:05:06,011 MainThread <module>] DONE

除非同时为thread-1(thread-2继承)或仅直接为thread-2设置daemon=True

parties=2, daemon=True
[18:05:35,539 MainThread <module>] ALIVE: ['MainThread']
[18:05:35,539 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1']
[18:05:35,539 Thread-1 t1_target] ALIVE: ['MainThread', 'Thread-1', 'TimerThread']
[18:05:35,539 Thread-1 t1_target] DONE
[18:05:35,539 MainThread <module>] ALIVE: ['MainThread', 'TimerThread']
[18:05:35,539 MainThread <module>] DONE
[18:05:35,539 MainThread info] process shutting down
Process finished with exit code 0

助手

DEFAULT_MP_FORMAT = 
'[%(asctime)s,%(msecs)03d %(threadName)s %(funcName)s]' 
' %(message)s'
DEFAULT_DATEFORMAT = "%H:%M:%S"  # "%Y-%m-%d %H:%M:%S"

def get_mp_logger(level=None, fmt=DEFAULT_MP_FORMAT, datefmt=DEFAULT_DATEFORMAT):
"""
Initialize multiprocessing-logger if needed and return reference.
"""
import multiprocessing.util as util
import logging
logger = util.get_logger()
if not logger.handlers:
logger = util.log_to_stderr(level)
logger.handlers[0].setFormatter(logging.Formatter(fmt, datefmt))
return logger

相关内容

  • 没有找到相关文章

最新更新