下面的代码首先启动多个进程。然后它运行一个while True
循环来检查queue
对象。最后,它迭代进程以检查是否有活动的进程。在所有处理完成之后,它breaks
循环while
。不幸的是,当queue
对象不为空时会发生这种情况。在不获得存储在queue
中的数据的情况下打破循环可能是一种容易监督数据丢失的方法。如何修改代码逻辑以确保queue
对象在中断循环之前为空?
import time, multiprocessing, os
logger = multiprocessing.log_to_stderr()
def foo(*args):
for i in range(3):
queue = args[0]
queue.put(os.getpid())
items = dict()
for i in range(5):
queue = multiprocessing.Queue()
proc = multiprocessing.Process(target=foo, args=(queue,))
items[proc] = queue
proc.start()
time.sleep(0.1)
while True:
time.sleep(1)
for proc, queue in items.items():
if not queue.empty():
print(queue.get())
if not True in [proc.is_alive() for proc in items]:
if not queue.empty():
logger.warning('...not empty: %s' % queue.get())
break
再次出现同步问题。当您检查队列时发现它是空的,不能保证将来不会有新项目出现。
当一个子流程完成其作业时,您可以在队列中放置一个sentinel,以通知队列中不再有项目。父进程可以耗尽队列,直到得到sentinel。这也是CCD_ 8所使用的方法。你可以在这里使用None
作为哨兵:
def foo(*args):
for i in range(3):
queue = args[0]
queue.put(os.getpid())
queue.put(None)
...
while items:
for proc in tuple(items.keys()):
queue = items[proc]
if not queue.empty():
r = queue.get()
print(r)
if r is None:
proc.join()
del items[proc]
time.sleep(0.1)
下面发布了一个有效的解决方案。这种方法使用multiprocessing.pool.ThreadPool.map_async
方法来启动进程,而不是使用Process.run
来运行proc。然后,multiprocessing.Queue
对象用于存储MainProcess运行的foo
函数可访问的数据。
import time, multiprocessing, Queue
from multiprocessing.pool import ThreadPool
logger = multiprocessing.log_to_stderr()
def foo(args):
queue = args[0]
arg = args[1]
for i in range(3):
time.sleep(2)
queue.put([arg, time.time()])
pool = ThreadPool(processes=4)
queue = multiprocessing.Queue()
map_result = pool.map_async(foo, [(queue, arg) for arg in range(3)])
logger.warning("map_result: %s" % map_result)
map_result.wait(timeout = 10)
if not map_result.ready():
message = '%s is timed out and terminated.' % pool
log.error(message)
pool.terminate()
raise Exception(message)
while not queue.empty():
if queue.empty():
break
logger.warning("queue_data: %r" % queue.get(True, 0.1))
pool.close()
pool.join()
#encoding:utf-8
from multiprocessing import Pool, Manager
def tiny(q, j):
if len(j) < 100:
q.put(j+j[-1])
print " Done!", j
q.put(-1)
return
queue = Manager().Queue()
pool = Pool(processes=10)
pool.apply_async(tiny, (queue, "A"))
pool.apply_async(tiny, (queue, "B"))
pool.apply_async(tiny, (queue, "C"))
created = 3
fininshed = 0
while created > fininshed:
i = queue.get(True, None)
if isinstance(i, int):
fininshed += 1
else:
created += 1
pool.apply_async(tiny, (queue, i))
pool.close()
pool.join()
print [worker.is_alive() for worker in pool._pool]