多处理.队列间歇性失败.Python中的Bug



Python的multiprocessing.Queue间歇性地失败,我不知道为什么。这是Python中的错误还是我的脚本中的错误?

最小失败脚本

import multiprocessing
import time
import logging
import multiprocessing.util
multiprocessing.util.log_to_stderr(level=logging.DEBUG)
queue = multiprocessing.Queue(maxsize=10)
def worker(queue):
queue.put('abcdefghijklmnop')
# "Indicate that no more data will be put on this queue by the
# current process." --Documentation
# time.sleep(0.01)
queue.close()
proc = multiprocessing.Process(target=worker, args=(queue,))
proc.start()
# "Indicate that no more data will be put on this queue by the current
# process." --Documentation
# time.sleep(0.01)
queue.close()
proc.join()

我正在Debian的CPython 3.6.6中测试这一点。它也与dockerpython:3.7.0-alpine一起失败。

docker run --rm -v "${PWD}/test.py:/test.py" 
python:3-alpine python3 /test.py

上述脚本有时会因BrokenPipeError而失败。

Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/queues.py", line 240, in _feed
send_bytes(obj)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/lib/python3.6/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.6/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

测试线束

因为这是间歇性的,所以我写了一个shell脚本多次调用它并计算失败次数。

#!/bin/sh
total=10
successes=0
for i in `seq ${total}`
do
if ! docker run --rm -v "${PWD}/test.py:/test.py" python:3-alpine 
python3 test.py 2>&1 
| grep --silent BrokenPipeError
then
successes=$(expr ${successes} + 1)
fi
done
python3 -c "print(${successes} / ${total})"

这通常显示一些分数,可能0.2表示间歇性故障。

定时调整

如果我在queue.close()之前插入time.sleep(0.01),它会一直工作。我在源代码中注意到,编写是在它自己的线程中进行的。我认为,如果写线程仍在尝试写数据,而所有其他线程都关闭了队列,那么就会导致错误。

调试日志

通过取消前几行的注释,我可以跟踪失败和成功的执行情况。

故障:

[DEBUG/MainProcess] created semlock with handle 140480257941504
[DEBUG/MainProcess] created semlock with handle 140480257937408
[DEBUG/MainProcess] created semlock with handle 140480257933312
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
Traceback (most recent call last):
File "/usr/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/usr/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/usr/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[DEBUG/Process-1] feeder thread got sentinel -- exiting
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

"成功"(真正无声的失败,只能用Python 3.6复制(:

[DEBUG/MainProcess] created semlock with handle 139710276231168
[DEBUG/MainProcess] created semlock with handle 139710276227072
[DEBUG/MainProcess] created semlock with handle 139710276222976
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[INFO/Process-1] error in queue thread: [Errno 32] Broken pipe
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

真正的成功(使用任一time.sleep(0.01)(:

[DEBUG/MainProcess] created semlock with handle 140283921616896
[DEBUG/MainProcess] created semlock with handle 140283921612800
[DEBUG/MainProcess] created semlock with handle 140283921608704
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/Process-1] Queue._after_fork()
[INFO/Process-1] child process calling self.run()
[DEBUG/Process-1] Queue._start_thread()
[DEBUG/Process-1] doing self._thread.start()
[DEBUG/Process-1] starting thread to feed data to pipe
[DEBUG/Process-1] ... done self._thread.start()
[DEBUG/Process-1] telling queue thread to quit
[INFO/Process-1] process shutting down
[DEBUG/Process-1] feeder thread got sentinel -- exiting
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[DEBUG/Process-1] joining queue thread
[DEBUG/Process-1] ... queue thread joined
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

区别似乎在于,在真正成功的情况下,feeder在atexit处理程序之前接收sentinel对象。

代码的主要问题是没有人使用工作进程放入队列的内容。python队列期望队列中的数据在放入数据的进程被终止之前被消耗掉("刷新到管道"(。

从这个角度来看,你的例子没有多大意义,但如果你想让它发挥作用:

关键是CCD_ 7-https://docs.python.org/3/library/multiprocessing.html

警告如上所述,如果子进程已将项目放在队列(并且它没有使用JoinableQueue.cancel_join_thread(,则直到所有缓冲项都被冲洗至管道。这意味着,如果你尝试加入这个过程除非您确定已被放入队列中。同样,如果孩子进程是非守护进程的,则当它试图加入所有非守护进程的子进程。

请注意,使用管理器创建的队列没有此问题

^相关位。问题是,这些东西是从子进程放入队列的,但没有被任何人使用。在这种情况下,在要求join之前,必须在CHILD进程上调用cancel_join_queue。此代码示例将消除该错误。

import multiprocessing
import time
import logging
import multiprocessing.util
multiprocessing.util.log_to_stderr(level=logging.DEBUG)
queue = multiprocessing.Queue(maxsize=10)
def worker(queue):
queue.put('abcdefghijklmnop')
# "Indicate that no more data will be put on this queue by the
# current process." --Documentation
# time.sleep(0.01)
queue.close()

queue.cancel_join_thread() # ideally, this would not be here but would rather be a response to a signal (or other IPC message) sent from the main process

proc = multiprocessing.Process(target=worker, args=(queue,))
proc.start()
# "Indicate that no more data will be put on this queue by the current
# process." --Documentation
# time.sleep(0.01)
queue.close()
proc.join()

我没有为此与IPC打交道,因为根本没有消费者,但我希望这个想法是明确的。

最新更新