池、队列、挂起



我想使用队列来保存结果,因为我希望使用者(串行不是并行(在工作线程产生结果时处理工作线程的结果。

现在,我想知道为什么以下程序挂起。

import multiprocessing as mp
import time
import numpy as np
def worker(arg):
time.sleep(0.2)
q, arr = arg 
q.put(arr[0])
p = mp.Pool(4)
x = np.array([4,4])
q = mp.Queue()
for i in range(4):
x[0] = i 
#worker((q,x))
p.apply_async(worker, args=((q, x),)) 
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())

Queue对象无法共享。我首先通过找到这个答案得出了与OP相同的结论。

不幸的是,此代码中还有其他问题(这并不能使其成为链接答案的精确副本(

  • worker(arg)应该worker(*arg),以便参数拆包工作。没有这个,我的过程也被锁定了(我承认我不知道为什么。它应该抛出一个异常,但我想多处理和异常不能很好地协同工作(
  • 将相同的x传递给工作人员会导致相同的数字(apply它有效,但不适用于apply_async

另一件事:为了使代码可移植,请通过if __name__ == "__main__":包装主代码,这在 Windows 上是必需的,因为进程生成的差异

为我输出 0,3,2,1 的完全固定代码:

import multiprocessing as mp
import time
import numpy as np
def worker(*arg):  # there are 2 arguments to "worker"
#def worker(q, arr):  # is probably even better
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
if __name__ == "__main__":
p = mp.Pool(4)
m = mp.Manager()  # use a manager, Queue objects cannot be shared
q = m.Queue()
for i in range(4):
x = np.array([4,4])  # create array each time (or make a copy)
x[0] = i
p.apply_async(worker, args=(q, x))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())

更改要应用的apply_async会给出错误消息:

"Queue objects should only be shared between processes through inheritance"

一个解决方案:

import multiprocessing as mp
import time
import numpy as np
def worker(arg):
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
p = mp.Pool(4)
x = np.array([4,4])
m = mp.Manager()
q = m.Queue()
for i in range(4):
x[0] = i
#worker((q,x))
p.apply_async(worker, args=((q, x),))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())

结果:

done_apply
3
3
3
3

显然,我需要手动复制 numpy 数组,因为所需的结果应该是任何顺序的 0、1、2、3,而不是 3、3、3、3。

我认为您选择将multiprocessing.Pool与您自己的queue一起使用是您遇到的主要问题的根源。使用池会预先创建子进程,稍后会将作业分配给这些子进程。但是,由于您无法(轻松地(将queue传递给已经存在的流程,因此这与您的问题不太匹配。

相反,您应该摆脱自己的队列并使用池中内置的队列来获取workerreturn的值,或者完全废弃池并使用multiprocessing.Process为必须执行的每个任务启动新过程。

我还要指出,您的代码在主进程中存在修改x数组的主线程和在将旧值发送到工作进程之前序列化旧值的线程之间的竞争条件。大多数时候,您最终可能会发送同一数组的许多副本(带有最终值(,而不是您想要的几个不同值。

这是一个快速且未经测试的版本,可以删除队列:

def worker(arr):
time.sleep(0.2)
return arr[0]
if __name__ == "__main__":
p = mp.Pool(4)
results = p.map(worker, [np.array([i, 4]) for i in range(4)])
p.join()
for result in results:
print(result)

这是一个删除Pool并保持队列的版本:

def worker(q, arr): 
time.sleep(0.2)
q.put(arr[0])
if __name__ == "__main__":
q = m.Queue()
processes = []
for i in range(4):
p = mp.Process(target=worker, args=(q, np.array([i, 4])))
p.start()
processes.append(p)
for i in range(4):
print(q.get())
for p in processes:
p.join()

请注意,在上一版本中,在尝试join进程之前,从队列中get结果可能很重要(尽管如果我们只处理四个值,则可能不会(。如果队列填满,如果我们以其他顺序执行,则可能会发生死锁。工作进程在尝试写入队列时可能会被阻止,而主进程在等待工作进程退出时被阻止。

相关内容

  • 没有找到相关文章

最新更新