如何在多个进程中正确使用管道(>2(?
例如。 一个生产者几个消费者
这些代码在 Linux 环境中是失败的但是Windows环境很好
import multiprocessing, time
def consumer(pipe,id):
output_p, input_p = pipe
input_p.close()
while True:
try:
item = output_p.recv()
except EOFError:
break
print("%s consume:%s" % (id,item))
#time.sleep(3) # if no sleep these code will fault in Linux environment
# but windows environment is well
print('Consumer done')
def producer(sequence, input_p):
for item in sequence:
print('produce:',item)
input_p.send(item)
time.sleep(1)
if __name__ == '__main__':
(output_p, input_p) = multiprocessing.Pipe()
# create two consumer process
cons_p1 = multiprocessing.Process(target=consumer,args=((output_p,input_p),1))
cons_p1.start()
cons_p2 = multiprocessing.Process(target=consumer,args=((output_p,input_p),2))
cons_p2.start()
output_p.close()
sequence = [i for i in range(10)]
producer(sequence, input_p)
input_p.close()
cons_p1.join()
cons_p2.join()
不要对多个使用者使用管道。文档明确指出,当两个以上的进程读取或写入时,它将损坏。你做什么;两个读者。
Pipe(( 返回的两个连接对象表示管道的两端。每个连接对象都有 send(( 和 recv(( 方法(等等(。请注意,如果两个进程(或线程(尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。当然,同时使用管道不同端的流程不会造成损坏的风险。
所以使用队列,甚至使用JoinableQueue。
from multiprocessing import Process, JoinableQueue
from Queue import Empty
import time
def consumer(que, pid):
while True:
try:
item = que.get(timeout=10)
print("%s consume:%s" % (pid, item))
que.task_done()
except Empty:
break
print('Consumer done')
def producer(sequence, que):
for item in sequence:
print('produce:', item)
que.put(item)
time.sleep(1)
if __name__ == '__main__':
que = JoinableQueue()
# create two consumer process
cons_p1 = Process(target=consumer, args=(que, 1))
cons_p1.start()
cons_p2 = Process(target=consumer, args=(que, 2))
cons_p2.start()
sequence = [i for i in range(10)]
producer(sequence, que)
que.join()
cons_p1.join()
cons_p2.join()