Python multiprocessing:在异步调用中调用方法和传递对象



我试图用apply_async(https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult)调用完成两件事:

(i)调用类方法 (ii)传递一个对象作为参数 到目前为止,我有以下基线代码:
import multiprocessing as mp
class myClass():
def __init__(self, id):
self.id = id
self.val = 1.0
self.pool = None
def callback(self, obj):
self.val = obj.val
def foo(new_val):  # foo is outside myClass
print ('foo passed with', new_val)
c1.val = new_val
return c1
if __name__ == '__main__':
c1 = myClass('c1')
c1.pool = mp.Pool(processes=1)
c1.pool.apply_async(foo, args=(2.0, ), callback=c1.callback).wait()  
c1.pool.close()
c1.pool.join()
print ('c1.val:', c1.val)  # should display 'c1 val: 2.0'

输出:

foo passed with 2.0
c1.val: 2.0

当我尝试用下面的代码完成(I)时,我没有得到与上面相同的输出。

class myClass():
def __init__(self, id):
self.id = id
self.val = 1.0
self.pool = None
def callback(self, obj):
self.val = obj.val
def foo(self, new_val):  # foo is inside myClass
print ('foo passed with', new_val)
self.val = new_val
return self
if __name__ == '__main__':
c1 = myClass('c1')
c1.pool = mp.Pool(processes=1)
c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback).wait()  
c1.pool.close()
c1.pool.join()
print ('c1.val:', c1.val)  # should display 'c1 val: 2.0'

输出:

c1.val: 1.0

同样,当我尝试完成(ii)时,foo不会再次被调用。

class myClass():
def __init__(self, id):
self.id = id
self.val = 1.0
self.pool = None
def callback(self, obj):
self.val = obj.val
def foo(obj, new_val):  # foo is outside myClass
print ('foo passed with', new_val)
obj.val = new_val
return obj
if __name__ == '__main__':
c1 = myClass('c1')
c1.pool = mp.Pool(processes=1)
c1.pool.apply_async(foo, args=(c1, 2.0, ), callback=c1.callback).wait()  
c1.pool.close()
c1.pool.join()
print ('c1.val:', c1.val)  # should display 'c1 val: 2.0'

输出:

c1.val: 1.0

你知道在上面的代码中需要改变什么来完成(i)和(ii)吗?

调用未完成而引发异常。您可以使用multiprocessing.pool.AsyncResult.successful方法检查:

import multiprocessing as mp

class myClass():
def __init__(self, id):
self.id = id
self.val = 1.0
self.pool = None
def callback(self, obj):
self.val = obj.val
def foo(self, new_val):
print ('foo passed with', new_val)
self.val = new_val
return self
if __name__ == '__main__':
c1 = myClass('c1')
c1.pool = mp.Pool(processes=1)
async_result = c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback)
async_result.wait()
print(async_result.successful())  # this is printing False!!!
c1.pool.close()
c1.pool.join()
print ('c1.val:', c1.val)

现在你可以定义一个error_callback来看看发生了什么:

...
async_result = c1.pool.apply_async(c1.foo, args=(2.0, ), callback=c1.callback, error_callback=lambda x: print(x))
...

这是这个函数打印的错误:

pool objects cannot be passed between processes or pickled

在这个SO问题上,你可以找到更多关于为什么会发生这种情况的信息。问题是multiprocessing代码必须pickle它发送给它已经启动的子进程的东西,而pickle程序不执行实例方法。

相关内容

  • 没有找到相关文章

最新更新