ProcessPool中的Python实例计数器



我很难找到实现以下内容的正确方法:我在Python3中有一个类,我为它保留了一个实例计数器。使用concurrent.futures.ProcessPoolExecutor,我提交了几个使用此类的任务。我认为,由于任务在不同的进程中运行,它们之间不会有共享状态,但似乎我错了,因为这个实例计数器是在它们之间共享的。以下代码举例说明了我的意思:

import concurrent.futures
class A:
counter = 0
def __init__(self):
A.counter += 1
self.id = A.counter
def hello(self):
return f'Hello from node{self.id}'
def start():
instance = A()
return instance.hello()
results = []
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
for i in range(4):
f = executor.submit(start)
results.append(f)
for r in results:
print(r.result())

以上输出为:

Hello from node1
Hello from node2
Hello from node1
Hello from node1

问题不在于访问计数器时的竞争条件,我的问题是,当我期望变量在每个进程中是私有的时,它甚至是共享的(例如,每个工作者从0开始(。实现这一目标的蟒蛇方式是什么?

提前谢谢。

在这里,您似乎发现任务并不总是均匀地分布在处理池中的工作人员之间,并且其中一个工作人员成功地完成了2"任务";而其中一人(4人(一无所获。在每个worker中,通过从调用fork的时间(*nix(或从import复制主文件(Windows和MacOS(来定义A类。类属性counter的行为类似于全局变量,因为它没有被定义为实例属性,所以任何获得多个任务的工作者都会看到该值每次都在增加。虽然可以通过限制员工在被解雇和重新开始之前只能完成一项任务来避免这种情况,但通常情况下,避免全局状态是一种好的做法。maxtasksperchild更频繁地用于清理子进程由于各种原因可能不会随着时间的推移释放内存或文件句柄以防止泄漏的实例。

正如您在评论中所说,长时间运行的任务可以减少每个任务重新启动流程的开销的相对影响,但是,如果您使用的任何函数将函数映射到可迭代函数上,并接受chunksize参数,这种方法可能会失败。A";任务";不是映射的一次迭代,但可以同时进行多次迭代(以减少传递参数和结果的开销(。这个例子应该演示一个带有maxtasksperchild=1的池,其中每个孩子最终调用start()4次:

from multiprocessing import Pool
class A:
counter = 0
def __init__(self):
A.counter += 1
self.id = A.counter
def hello(self):
return f'Hello from node{self.id}'
def start(_):
instance = A()
print( instance.hello())
if __name__ == "__main__":
with Pool(4, maxtasksperchild=1) as p:
p.map(start, range(16), chunksize=4)

最新更新