我想将多处理任务封装到一个类中。控制函数和辅助函数都是类的成员。工作进程使用Pool.map_async()
运行,因此可以在其他工作进程仍在运行时处理结果。处理结果存储在CCD_ 2中。当Queue是实例变量时,它不起作用,而全局变量或类变量则起作用。
示例:
import multiprocessing
class A():
# Queue as instance variable
def __init__(self):
self.qout = multiprocessing.Queue()
def worker(self,x):
self.qout.put(x*x)
def process(self):
values = range(10)
with multiprocessing.Pool() as pool:
res = pool.map_async(self.worker,values)
while (not self.qout.empty() or
not res.ready()):
val = self.qout.get()
print(val)
qoutB = multiprocessing.Queue()
class B():
# Queue as global variable
def __init__(self):
pass
def worker(self,x):
qoutB.put(x*x)
def process(self):
values = range(10)
with multiprocessing.Pool() as pool:
res = pool.map_async(self.worker,values)
while (not qoutB.empty() or
not res.ready()):
val = qoutB.get()
print(val)
class C():
# Queue as Class variable
qout = multiprocessing.Queue()
def __init__(self):
pass
def worker(self,x):
self.qout.put(x*x)
def process(self):
values = range(10)
with multiprocessing.Pool() as pool:
res = pool.map_async(self.worker,values)
while (not self.qout.empty() or
not res.ready()):
val = self.qout.get()
print(val)
现在,当您如下调用类时(将其放在类定义下面(
a=A()
a.process()
不工作(可能在self.qout.get()
上停止等待,但
a=B()
a.process()
和
a=C()
a.process()
作品(打印结果(。为什么?
我在Python文档中没有找到任何相关信息。我还没有尝试将队列作为参数传递,但它是一个应该对用户隐藏的功能。
B选项应该是毫无疑问的,C不是理想的,因为队列将在类的所有实例之间共享。
注意:这是在Linux上测试的(Debian,Python 3.5来自存储库(。
同样,这不是您问题的答案。然而,我之所以发布它,是因为它让整个问题变得毫无意义——因为你真的不需要显式地创建和使用multiprocessing.Queue
来做这样的事情。
相反,可以考虑使用concurrent.futures.ProcessPoolExecutor
来完成任务。
例如:
import concurrent.futures
class A_Prime():
def __init__(self):
pass
def worker(self, x):
return x*x
def process(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
classname = type(self).__name__
print(classname, '- calling executor.map')
res = [value for value in executor.map(self.worker, range(10))]
print(classname, '- executor.map finished')
print(' result:', res)
if __name__ == '__main__':
test = A_Prime()
test.process()
print('done')
输出:
A_Prime - calling executor.map
A_Prime - executor.map finished
result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
done
SO算法给了我以前找不到的有趣提示。
基于这个答案,队列不能作为参数传递给正在打开新进程的函数,因为队列不能被pickle。这就是self.function()
的作用:它等价于function(self)
。在类A
的情况下,尝试将队列传递给工作者;其中,与B
和C
一样,它不是并且或多或少地独立于过程
从这个问题和答案中得出同样的推论。不用说,manager.Queue
在这里也不起作用。
MCVE测试失败
这可能是由于multiprocessing
(见文档(的默认启动方法不同