使用多处理队列和线程在程序退出时"EOF error"



我很难理解为什么这个简单的程序在最后引发了EOFError

我正在使用Queue()与我想自动干净地终止程序atexitThread()进行通信。

import threading
import multiprocessing
import atexit
class MyClass:
def __init__(self):
self.queue = None
self.thread = None
def start(self):
self.queue = multiprocessing.Queue()
self.thread = threading.Thread(target=self.queued_writer, daemon=True)
self.thread.start()
# Remove this: no error
self.queue.put("message")
def queued_writer(self):
while 1:
msg = self.queue.get()
print("Message:", msg)
if msg is None:
break
def stop(self):
self.queue.put(None)
self.thread.join()
instance = MyClass()
atexit.register(instance.stop)
# Put this before register: no error
instance.start()

这提高了:

Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "test.py", line 21, in queued_writer
msg = self.queue.get()
File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get
res = self._recv_bytes()
File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError

此外,这个片段的行为很奇怪:如果我删除self.queue.put("message")行,则不会引发任何错误,线程成功退出。同样,如果在atexit.register()之前调用instance.start(),这似乎有效。

有谁知道错误从哪里来?

编辑:我注意到使用SimpleQueue()似乎使错误消失。

该问题来自多个atexit.register()调用之间的冲突。

文档指出:

atexit以与注册顺序相反的顺序运行这些函数;如果你注册了ABC,在解释器终止时,它们将按照CBA的顺序运行。

[...]

假设较低级别的模块通常会在较高级别的模块之前导入,因此必须在以后进行清理。

通过首先导入multiprocessing然后调用atexit.register(my_stop),您会期望在任何内部终止过程之前执行停止函数...但事实并非如此,因为atexit.register()可以动态调用。

在本例中,multiprocessing库利用_exit_function函数,该函数旨在干净地关闭内部线程和队列。此函数在模块级别注册atexit,但仅在初始化Queue()对象后加载模块。

因此,MyClass停止函数在multiprocessing的函数之前注册,因此instance.stop_exit_function之后被调用。

在终止期间,_exit_function关闭内部管道连接,因此,如果线程稍后尝试使用关闭的读取连接调用.get(),则会引发EOFError。仅当 Python 没有时间在最后自动终止daemon线程时,才会发生这种情况,也就是说,如果"慢速"退出函数(如time.sleep(0.1)或在这种情况下为thread.join())在通常的关闭过程之后注册并运行。由于某种原因,写入连接关闭会延迟.put()因此不会立即引发错误。

至于为什么对代码段进行小的修改使其起作用:SimpleQueue没有Finalizer所以内部管道稍后关闭。Queue的内螺纹直到调用第一个.put()才启动,因此移除它意味着没有管道要关闭。也可以通过导入multiprocessing.queues来强制注册。

要实现它,您可以在类中定义__enter____exit__,并使用with语句创建实例:

import threading
import multiprocessing

class MyClass:
def __init__(self):
self.queue = None
self.thread = None
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.stop()
def start(self):
self.queue = multiprocessing.Queue()
self.thread = threading.Thread(target=self.queued_writer, daemon=True)
self.thread.start()
def queued_writer(self):
while 1:
msg = self.queue.get()
print("Message:", str(msg))
if msg is None:
break
def put(self, msg):
self.queue.put(msg)
def stop(self):
self.queue.put(None)
self.thread.join()

with MyClass() as instance:
instance.start()
print('Thread stopped: ' + str(instance.thread._is_stopped))
instance.put('abc')
print('Thread stopped: ' + str(instance.thread._is_stopped))

上面的代码给出了一个输出:

Thread stopped: False
Message: abc
Message: None
Thread stopped: True

您的问题的表面答案相当简单,当主进程结束时,queued_writer进程仍在等待将条目写入队列,将 EOF 发送到self.queue.get打开的打开阻塞连接。

这就提出了一个问题,为什么atexit.register似乎没有做它的工作,但我不知道原因。

相关内容

  • 没有找到相关文章

最新更新