我遵循本文中规定的原则,安全地输出最终将写入文件的结果。不幸的是,代码只打印 1 和 2,而不是 3 到 6。
import os
import argparse
import pandas as pd
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
def feed(queue, parlist):
for par in parlist:
queue.put(par)
print("Queue size", queue.qsize())
def calc(queueIn, queueOut):
while True:
try:
par=queueIn.get(block=False)
res=doCalculation(par)
queueOut.put((res))
queueIn.task_done()
except:
break
def doCalculation(par):
return par
def write(queue):
while True:
try:
par=queue.get(block=False)
print("response:",par)
except:
break
if __name__ == "__main__":
nthreads = 2
workerQueue = Queue()
writerQueue = Queue()
considerperiod=[1,2,3,4,5,6]
feedProc = Process(target=feed, args=(workerQueue, considerperiod))
calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target=write, args=(writerQueue,))
feedProc.start()
feedProc.join()
for p in calcProc:
p.start()
for p in calcProc:
p.join()
writProc.start()
writProc.join()
在运行它打印的代码时,
$ python3 tst.py
Queue size 6
response: 1
response: 2
此外,是否可以确保写入函数始终输出 1,2,3,4,5,6,即以与数据输入馈送队列相同的顺序输出?
错误在某种程度上与task_done()
调用有关。如果你删除那个,那么它可以工作,不要问我为什么(IMO 这是一个错误)。但它的工作方式是queueIn.get(block=False)
调用引发异常,因为队列为空。对于您的用例来说,这可能已经足够了,更好的方法是使用 sentinels (如多处理文档中所建议的,请参阅最后一个示例)。下面是一些重写,以便您的程序使用哨兵:
import os
import argparse
import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
def feed(queue, parlist, nthreads):
for par in parlist:
queue.put(par)
for i in range(nthreads):
queue.put(None)
print("Queue size", queue.qsize())
def calc(queueIn, queueOut):
while True:
par=queueIn.get()
if par is None:
break
res=doCalculation(par)
queueOut.put((res))
def doCalculation(par):
return par
def write(queue):
while not queue.empty():
par=queue.get()
print("response:",par)
if __name__ == "__main__":
nthreads = 2
workerQueue = Queue()
writerQueue = Queue()
considerperiod=[1,2,3,4,5,6]
feedProc = Process(target=feed, args=(workerQueue, considerperiod, nthreads))
calcProc = [Process(target=calc, args=(workerQueue, writerQueue)) for i in range(nthreads)]
writProc = Process(target=write, args=(writerQueue,))
feedProc.start()
feedProc.join()
for p in calcProc:
p.start()
for p in calcProc:
p.join()
writProc.start()
writProc.join()
需要注意的几点:
- 哨兵正在将
None
放入队列中。请注意,每个工作进程都需要一个哨兵。 - 对于
write
函数,您不需要执行 sentinel 处理,因为只有一个进程,并且您不需要处理并发性(如果您要执行empty()
然后在calc
函数中get()
thingie,则会遇到问题,例如队列中只剩下一个项目并且两个工作人员同时检查empty()
,然后都想做get()
然后其中一个被永久锁定) - 您不需要将
feed
和write
放入进程中,只需将它们放入您的主函数中,因为您无论如何都不想并行运行它。
如何在输出中具有与输入相同的顺序?[...]我想multiprocessing.map可以做到这一点
是的,地图保持秩序。将程序重写为更简单的程序(因为您不需要workerQueue
和writerQueue
并添加随机睡眠来证明输出仍然有序:
from multiprocessing import Pool
import time
import random
def calc(val):
time.sleep(random.random())
return val
if __name__ == "__main__":
considerperiod=[1,2,3,4,5,6]
with Pool(processes=2) as pool:
print(pool.map(calc, considerperiod))