Python多线程+多处理BrokenPipeError(子进程未退出?)



当线程采用多处理时,我会得到BrokenPipeError。JoinableQueue派生进程。这似乎发生在程序完成工作并试图退出之后,因为它做了它应该做的一切。这意味着什么,有办法解决这个问题吗?

import requests
import multiprocessing
from multiprocessing import JoinableQueue
from queue import Queue
import threading

class ProcessClass(multiprocessing.Process):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.func = func
    def run(self):
        while True:
            arg = self.in_queue.get()
            self.func(arg, self.out_queue)
            self.in_queue.task_done()

class ThreadClass(threading.Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.func = func
    def run(self):
        while True:
            arg = self.in_queue.get()
            self.func(arg, self.out_queue)
            self.in_queue.task_done()

def get_urls(host, out_queue):
    r = requests.get(host)
    out_queue.put(r.text)
    print(r.status_code, host)

def get_title(text, out_queue):
    print(text.strip('rn ')[:5])

if __name__ == '__main__':
    def test():
        q1 = JoinableQueue()
        q2 = JoinableQueue()
        for i in range(2):
            t = ThreadClass(get_urls, q1, q2)
            t.daemon = True
            t.setDaemon(True)
            t.start()
        for i in range(2):
            t = ProcessClass(get_title, q2, None)
            t.daemon = True
            t.start()
        for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",):
            q1.put(host)
        q1.join()
        q2.join()
    test()
    print('Finished')

程序输出:

200 http://ibm.com
<!DOC
200 http://google.com
<!doc
200 http://yahoo.com
<!DOC
200 http://apple.com
<!DOC
200 http://amazon.com
<!DOC
Finished
Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:Python33libmultiprocessingconnection.py", line 313, in _recv_bytes
    nread, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109]
The pipe has been ended
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "C:Python33libthreading.py", line 901, in _bootstrap_inner
    self.run()
  File "D:ProgsUspatuspatspiderrunthreads_test.py", line 31, in run
    arg = self.in_queue.get()
  File "C:Python33libmultiprocessingqueues.py", line 94, in get
    res = self._recv()
  File "C:Python33libmultiprocessingconnection.py", line 251, in recv
    buf = self._recv_bytes()
  File "C:Python33libmultiprocessingconnection.py", line 322, in _recv_bytes
    raise EOFError
EOFError
....

(为其他线程剪切相同的错误。)

如果我将JoinableQueue切换到queue。队列为多线程部分,一切都修复了,但为什么?

之所以会发生这种情况,是因为当主线程退出时,后台线程在multiprocessing.Queue.get调用中处于阻塞状态,但这种情况只在某些情况下发生:

  1. 当主线程退出时,守护进程线程正在multiprocessing.Queue.get上运行并阻塞
  2. multiprocessing.Process正在运行
  3. multiprocessing上下文不是'fork'

异常告诉您,multiprocessing.JoinableQueue正在侦听的Connection的另一端在get()调用内部发送了EOF。一般来说,这意味着Connection的另一侧已经关闭。这种情况发生在关闭期间是有道理的——Python在退出解释器之前清理所有对象,其中一部分清理涉及关闭所有打开的Connection对象。我还没有弄清楚的是,为什么只有当multiprocessing.Process已经派生(而不是分叉,这就是为什么默认情况下它不会在Linux上发生)并且仍在运行时才会发生这种情况。如果我创建了一个只在while循环中休眠的multiprocessing.Process,我甚至可以复制它。它根本不需要任何Queue对象。无论出于何种原因,一个正在运行、派生的子进程的存在似乎可以保证引发异常。这可能只是导致事物被破坏的顺序刚好适合种族状况的发生,但这只是一种猜测。

在任何情况下,使用queue.Queue而不是multiprocessing.JoinableQueue都是修复它的好方法,因为实际上并不需要multiprocessing.Queue。您还可以通过向后台线程和/或后台进程的队列发送哨兵来确保它们在主线程之前关闭。因此,让两个run方法都检查sentinel:

def run(self):
    for arg in iter(self.in_queue.get, None):  # None is the sentinel
        self.func(arg, self.out_queue)
        self.in_queue.task_done()
    self.in_queue.task_done()

然后在你完成后发送哨兵:

    threads = []
    for i in range(2):
        t = ThreadClass(get_urls, q1, q2)
        t.daemon = True
        t.setDaemon(True)
        t.start()
        threads.append(t)
    p = multiprocessing.Process(target=blah)
    p.daemon = True
    p.start()
    procs = []
    for i in range(2):
        t = ProcessClass(get_title, q2, None)
        t.daemon = True
        t.start()
        procs.append(t)
    for host in ("http://ibm.com", "http://yahoo.com", "http://google.com", "http://amazon.com", "http://apple.com",):
        q1.put(host)
    q1.join()
    # All items have been consumed from input queue, lets start shutting down.
    for t in procs:
        q2.put(None)
        t.join()
    for t in threads:
        q1.put(None)
        t.join()
    q2.join()

最新更新