远程队列使用者在重新启动后错过第一条消息



我有如下代码:

服务器.py

import queue
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
q = queue.Queue()
QueueManager.register('queue', callable=lambda:q)
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
s = m.get_server()
s.serve_forever()

生产者.py

from multiprocessing.managers import BaseManager
import time
class QueueManager(BaseManager):
pass
QueueManager.register('queue')
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
m.connect()
queue = m.queue()
idx = 0
while True:
time.sleep(2)
queue.put(idx)
idx += 1

消费者.py

from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('queue')
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
m.connect()
queue = m.queue()
while True:
message = queue.get()
print(message)

如果我运行服务器和生产者,然后启动消费者,我会看到生产者放入队列中的所有消息都出现在消费者处。但是,如果我停止使用者并立即重新启动它,它总是跳过一条消息。

为了说明我所看到的消费者的输出。py:

0
1
2
3
<restart the consumer>
5
6
7
etc.

这就是python多处理队列的工作方式吗?是错误还是我做错了什么?

我认为问题在于python中管道的实现方式,或者它甚至可能是操作系统的限制。这是完整的堆栈跟踪:

Traceback (most recent call last):
File "consumer.py", line 12, in <module>
message = queue.get()
File "<string>", line 2, in get
File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/managers.py", line 757, in _callmethod
kind, result = conn.recv()
File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt

该值似乎在queue.get()调用中丢失,该调用没有正确地用SIGINT终止。queue.get()会立即被取消,所以python不会完成get()调用然后丢失值。看起来更像是python没有正确地取消管道上的recv

如果您将消费者更改为:

while True:
while queue.empty():
sleep(0.1)
message = queue.get()
print(message)

它会起作用的。但这当然是一个变通办法,而不是真正的解决方案。

更新:

在玩了更多的代码之后,我认为这是一个错误,因为:

  1. 您逐一遵循了他们的编码示例
  2. 任何类型的队列都不能解决问题(multiprocessing.Queuemultiprocessing.JoinableQueue
  3. 发送task_done()也无济于事

该错误同时发生在python2和python3上。我建议您将此报告为错误。在最坏的情况下,如果它不是一个bug,您至少可以得到python为什么会这样做的解释。

最新更新