多处理。队列在类中不用作实例变量?



我想将多处理任务封装到一个类中。控制函数和辅助函数都是类的成员。工作进程使用Pool.map_async()运行,因此可以在其他工作进程仍在运行时处理结果。处理结果存储在CCD_ 2中。当Queue是实例变量时,它不起作用,而全局变量或类变量则起作用。

示例:

import multiprocessing 
class A():
# Queue as instance variable
def __init__(self):
self.qout = multiprocessing.Queue()
def worker(self,x):
self.qout.put(x*x)   
def process(self):
values = range(10)
with multiprocessing.Pool() as pool:
res = pool.map_async(self.worker,values)        
while (not self.qout.empty() or
not res.ready()):
val = self.qout.get()
print(val)
qoutB = multiprocessing.Queue()
class B():
# Queue as global variable
def __init__(self):
pass   
def worker(self,x):
qoutB.put(x*x)       
def process(self):
values = range(10)       
with multiprocessing.Pool() as pool:
res = pool.map_async(self.worker,values)           
while (not qoutB.empty() or
not res.ready()):
val = qoutB.get()
print(val)
class C():
# Queue as Class variable
qout = multiprocessing.Queue()
def __init__(self):
pass
def worker(self,x):
self.qout.put(x*x)   
def process(self):
values = range(10)
with multiprocessing.Pool() as pool:
res = pool.map_async(self.worker,values)        
while (not self.qout.empty() or
not res.ready()):
val = self.qout.get()
print(val)  

现在,当您如下调用类时(将其放在类定义下面(

a=A()
a.process()

不工作(可能在self.qout.get()上停止等待,但

a=B()
a.process()

a=C()
a.process()

作品(打印结果(。为什么?

我在Python文档中没有找到任何相关信息。我还没有尝试将队列作为参数传递,但它是一个应该对用户隐藏的功能。

B选项应该是毫无疑问的,C不是理想的,因为队列将在类的所有实例之间共享。

注意:这是在Linux上测试的(Debian,Python 3.5来自存储库(。

同样,这不是您问题的答案。然而,我之所以发布它,是因为它让整个问题变得毫无意义——因为你真的不需要显式地创建和使用multiprocessing.Queue来做这样的事情。

相反,可以考虑使用concurrent.futures.ProcessPoolExecutor来完成任务。

例如:

import concurrent.futures
class A_Prime():
def __init__(self):
pass
def worker(self, x):
return x*x
def process(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
classname = type(self).__name__
print(classname, '- calling executor.map')
res = [value for value in executor.map(self.worker, range(10))]
print(classname, '- executor.map finished')
print('  result:', res)

if __name__ == '__main__':
test = A_Prime()
test.process()
print('done')

输出:

A_Prime - calling executor.map
A_Prime - executor.map finished
result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
done

SO算法给了我以前找不到的有趣提示。

基于这个答案,队列不能作为参数传递给正在打开新进程的函数,因为队列不能被pickle。这就是self.function()的作用:它等价于function(self)。在类A的情况下,尝试将队列传递给工作者;其中,与BC一样,它不是并且或多或少地独立于过程

从这个问题和答案中得出同样的推论。不用说,manager.Queue在这里也不起作用。

MCVE测试失败

这可能是由于multiprocessing(见文档(的默认启动方法不同

相关内容

  • 没有找到相关文章

最新更新