如何在多个过程中正确使用管道(>2)



如何在多个进程中正确使用管道(>2(?

例如。 一个生产者几个消费者

这些代码在 Linux 环境中是失败的但是Windows环境很好

import multiprocessing, time
def consumer(pipe,id):
    output_p, input_p = pipe
    input_p.close()                    
    while True:
        try:
            item = output_p.recv()
        except EOFError:
            break
        print("%s consume:%s" % (id,item))
        #time.sleep(3)      # if no sleep  these code will fault in Linux environment
                            # but windows environment is well
    print('Consumer done')
def producer(sequence, input_p):
    for item in sequence:
        print('produce:',item)
        input_p.send(item) 
        time.sleep(1)
if __name__ == '__main__':
    (output_p, input_p) = multiprocessing.Pipe()
    # create two consumer process
    cons_p1 = multiprocessing.Process(target=consumer,args=((output_p,input_p),1)) 
    cons_p1.start() 
    cons_p2 = multiprocessing.Process(target=consumer,args=((output_p,input_p),2))
    cons_p2.start() 
    output_p.close()
    sequence = [i for i in range(10)]
    producer(sequence, input_p)
    input_p.close()
    cons_p1.join()
    cons_p2.join()

不要对多个使用者使用管道。文档明确指出,当两个以上的进程读取或写入时,它将损坏。你做什么;两个读者。

Pipe(( 返回的两个连接对象表示管道的两端。每个连接对象都有 send(( 和 recv(( 方法(等等(。请注意,如果两个进程(或线程(尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。当然,同时使用管道不同端的流程不会造成损坏的风险。

所以使用队列,甚至使用JoinableQueue。

from multiprocessing import Process, JoinableQueue
from Queue import Empty
import time

def consumer(que, pid):
    while True:
        try:
            item = que.get(timeout=10)
            print("%s consume:%s" % (pid, item))
            que.task_done()
        except Empty:
            break
    print('Consumer done')

def producer(sequence, que):
    for item in sequence:
        print('produce:', item)
        que.put(item) 
        time.sleep(1)
if __name__ == '__main__':
    que = JoinableQueue()
    # create two consumer process
    cons_p1 = Process(target=consumer, args=(que, 1)) 
    cons_p1.start() 
    cons_p2 = Process(target=consumer, args=(que, 2))
    cons_p2.start() 
    sequence = [i for i in range(10)]
    producer(sequence, que)
    que.join()
    cons_p1.join()
    cons_p2.join()

相关内容

  • 没有找到相关文章

最新更新