我很难理解为什么这个简单的程序在最后引发了EOFError
。
我正在使用Queue()
与我想自动干净地终止程序atexit
的Thread()
进行通信。
import threading
import multiprocessing
import atexit
class MyClass:
def __init__(self):
self.queue = None
self.thread = None
def start(self):
self.queue = multiprocessing.Queue()
self.thread = threading.Thread(target=self.queued_writer, daemon=True)
self.thread.start()
# Remove this: no error
self.queue.put("message")
def queued_writer(self):
while 1:
msg = self.queue.get()
print("Message:", msg)
if msg is None:
break
def stop(self):
self.queue.put(None)
self.thread.join()
instance = MyClass()
atexit.register(instance.stop)
# Put this before register: no error
instance.start()
这提高了:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "test.py", line 21, in queued_writer
msg = self.queue.get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
此外,这个片段的行为很奇怪:如果我删除self.queue.put("message")
行,则不会引发任何错误,线程成功退出。同样,如果在atexit.register()
之前调用instance.start()
,这似乎有效。
有谁知道错误从哪里来?
编辑:我注意到使用SimpleQueue()
似乎使错误消失。
该问题来自多个atexit.register()
调用之间的冲突。
文档指出:
atexit
以与注册顺序相反的顺序运行这些函数;如果你注册了A
、B
和C
,在解释器终止时,它们将按照C
、B
、A
的顺序运行。[...]
假设较低级别的模块通常会在较高级别的模块之前导入,因此必须在以后进行清理。
通过首先导入multiprocessing
然后调用atexit.register(my_stop)
,您会期望在任何内部终止过程之前执行停止函数...但事实并非如此,因为atexit.register()
可以动态调用。
在本例中,multiprocessing
库利用_exit_function
函数,该函数旨在干净地关闭内部线程和队列。此函数在模块级别注册atexit
,但仅在初始化Queue()
对象后加载模块。
因此,MyClass
停止函数在multiprocessing
的函数之前注册,因此instance.stop
在_exit_function
之后被调用。
在终止期间,_exit_function
关闭内部管道连接,因此,如果线程稍后尝试使用关闭的读取连接调用.get()
,则会引发EOFError
。仅当 Python 没有时间在最后自动终止daemon
线程时,才会发生这种情况,也就是说,如果"慢速"退出函数(如time.sleep(0.1)
或在这种情况下为thread.join()
)在通常的关闭过程之后注册并运行。由于某种原因,写入连接关闭会延迟.put()
因此不会立即引发错误。
至于为什么对代码段进行小的修改使其起作用:SimpleQueue
没有Finalizer
所以内部管道稍后关闭。Queue
的内螺纹直到调用第一个.put()
才启动,因此移除它意味着没有管道要关闭。也可以通过导入multiprocessing.queues
来强制注册。
要实现它,您可以在类中定义__enter__
和__exit__
,并使用with
语句创建实例:
import threading
import multiprocessing
class MyClass:
def __init__(self):
self.queue = None
self.thread = None
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.stop()
def start(self):
self.queue = multiprocessing.Queue()
self.thread = threading.Thread(target=self.queued_writer, daemon=True)
self.thread.start()
def queued_writer(self):
while 1:
msg = self.queue.get()
print("Message:", str(msg))
if msg is None:
break
def put(self, msg):
self.queue.put(msg)
def stop(self):
self.queue.put(None)
self.thread.join()
with MyClass() as instance:
instance.start()
print('Thread stopped: ' + str(instance.thread._is_stopped))
instance.put('abc')
print('Thread stopped: ' + str(instance.thread._is_stopped))
上面的代码给出了一个输出:
Thread stopped: False
Message: abc
Message: None
Thread stopped: True
您的问题的表面答案相当简单,当主进程结束时,queued_writer进程仍在等待将条目写入队列,将 EOF 发送到self.queue.get
打开的打开阻塞连接。
这就提出了一个问题,为什么atexit.register
似乎没有做它的工作,但我不知道原因。