我在对象中使用multiprocessing.Pool
类,并尝试以下操作:
from multiprocessing import Lock, Pool
class A:
def __init__(self):
self.lock = Lock()
self.file = open('test.txt')
def function(self, i):
self.lock.acquire()
line = self.file.readline()
self.lock.release()
return line
def anotherfunction(self):
pool = Pool()
results = pool.map(self.function, range(10000))
pool.close()
pool.join()
return results
然而,我收到一个运行时错误,指出锁对象只能通过继承在进程之间共享。我对Python和多处理还相当陌生。我怎样才能走上正轨?
multiprocessing.Lock
实例可以是multiprocessing.Process
实例的属性。当在具有锁属性的主进程中创建进程时,该锁存在于主进程的地址空间中。当调用进程的start
方法并运行一个子进程(该子进程调用进程的run
方法(时,必须将锁序列化/反序列化到子进程地址空间。如预期:
from multiprocessing import Lock, Process
class P(Process):
def __init__(self, *args, **kwargs):
Process.__init__(self, *args, **kwargs)
self.lock = Lock()
def run(self):
print(self.lock)
if __name__ == '__main__':
p = P()
p.start()
p.join()
打印:
<Lock(owner=None)>
不幸的是,当您处理multiprocessing.Pool
实例时,这不起作用。在您的示例中,self.lock
是通过__init__
方法在主进程中创建的。但是,当调用Pool.map
来调用self.function
时,无法将锁序列化/反序列化到将运行此方法的已在运行的池进程。
解决方案是用设置为该锁的全局变量初始化每个池进程(现在将该锁作为类的属性是没有意义的(。执行此操作的方法是使用池__init__
方法的初始值设定项和initargs参数。请参阅文档:
from multiprocessing import Lock, Pool
def init_pool_processes(the_lock):
'''Initialize each process with a global variable lock.
'''
global lock
lock = the_lock
class Test:
def function(self, i):
lock.acquire()
with open('test.txt', 'a') as f:
print(i, file=f)
lock.release()
def anotherfunction(self):
lock = Lock()
pool = Pool(initializer=init_pool_processes, initargs=(lock,))
pool.map(self.function, range(10))
pool.close()
pool.join()
if __name__ == '__main__':
t = Test()
t.anotherfunction()