>我正在尝试实现一个函数,该函数将 2 个函数作为参数,运行两个函数,返回首先返回的函数的值,并在完成执行之前杀死速度较慢的函数。 我的问题是,当我尝试清空用于收集返回值的 Queue 对象时,我卡住了。 有没有更"正确"的方法来处理这种情况甚至现有模块?如果没有,谁能解释我做错了什么? 这是我的代码(上述函数的实现是'run_both(('(:
import multiprocessing as mp
from time import sleep
Q = mp.Queue()
def dump_queue(queue):
result = []
for i in iter(queue.get, 'STOP'):
result.append(i)
return result
def rabbit(x):
sleep(10)
Q.put(x)
def turtle(x):
sleep(30)
Q.put(x)
def run_both(a,b):
a.start()
b.start()
while a.is_alive() and b.is_alive():
sleep(1)
if a.is_alive():
a.terminate()
else:
b.terminate()
a.join()
b.join()
return dump_queue(Q)
p1 = mp.Process(target=rabbit, args=(1,))
p1 = mp.Process(target=turtle, args=(2,))
run_both(p1, p2)
下面是一个使用multiprocessing
调用 2 个或更多函数并返回最快结果的示例。 但是,有一些重要的事情需要注意。
- 在 IDLE 中运行
multiprocessing
代码有时会导致问题。这个例子有效,但我确实遇到了这个问题试图解决这个问题。 - 多处理代码应从
if __name__ == '__main__'
子句内部开始,否则,如果主模块由另一个进程重新导入,它将再次运行。 有关详细信息,请阅读多处理文档页面。 - 结果队列直接传递给使用它的每个进程。通过在模块中引用全局名称来使用队列时,代码在 Windows 上失败,因为每个进程都使用队列的新实例。在此处阅读更多内容 Multiprocess Queue.get(( 挂起
我还在这里添加了一些功能,以了解实际使用了哪个进程的结果。
import multiprocessing as mp
import time
import random
def task(value):
# our dummy task is to sleep for a random amount of time and
# return the given arg value
time.sleep(random.random())
return value
def process(q, idx, fn, args):
# simply call function fn with args, and push its result in the queue with its index
q.put([fn(*args), idx])
def fastest(calls):
queue = mp.Queue()
# we must pass the queue directly to each process that may use it
# or else on Windows, each process will have its own copy of the queue
# making it useless
procs = []
# create a 'mp.Process' that calls our 'process' for each call and start it
for idx, call in enumerate(calls):
fn = call[0]
args = call[1:]
p = mp.Process(target=process, args=(queue, idx, fn, args))
procs.append(p)
p.start()
# wait for the queue to have something
result, idx = queue.get()
for proc in procs: # kill all processes that may still be running
proc.terminate()
# proc may be using queue, so queue may be corrupted.
# https://docs.python.org/3.8/library/multiprocessing.html?highlight=queue#multiprocessing.Process.terminate
# we no longer need queue though so this is fine
return result, idx
if __name__ == '__main__':
from datetime import datetime
start = datetime.now()
print(start)
# to be compatible with 'fastest', each call is a list with the first
# element being callable, followed by args to be passed
calls = [
[task, 1],
[task, 'hello'],
[task, [1,2,3]]
]
val, idx = fastest(calls)
end = datetime.now()
print(end)
print('elapsed time:', end-start)
print('returned value:', val)
print('from call at index', idx)
示例输出:
2019-12-21 04:01:09.525575
2019-12-21 04:01:10.171891
elapsed time: 0:00:00.646316
returned value: hello
from call at index 1
除了倒数第二行的错别字,应该为:
p2 = mp.Process(target=turtle, args=(2,)) # not p1
要使程序正常工作,您可以做的最简单的更改是添加:
Q.put('STOP')
到turtle()
和rabbit()
的结尾.
你也不需要继续循环观察进程是否处于活动状态,根据定义,如果你只是读取消息队列并接收STOP
,其中一个已经完成,所以你可以run_both()
替换为:
def run_both(a,b):
a.start()
b.start()
result = dump_queue(Q)
a.terminate()
b.terminate()
return result
您可能还需要考虑如果两个进程几乎同时将一些消息放入队列中会发生什么情况。他们可能会混为一谈。也许可以考虑使用 2 个队列,或者将所有结果合并到一条消息中,而不是将多个值从queue.get()