并行执行列表中每个对象中的方法



我有一个对象列表,我想在每个对象中并行执行一个方法。该方法修改对象的属性。例如:

class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
object_list = [Object(1), Object(2), Object(3)]
# I want to execute this in parallel
for i in range(len(object_list)):
object_list[i].aplus() 

我尝试了以下方法:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
executor = ProcessPoolExecutor(max_workers=3)
res = executor.map([obj.aplus for obj in object_list])

这不起作用,使对象保持不变。我认为这是因为使用多处理只能复制对象,而不能访问对象。知道吗?

非常感谢!

编辑:假设对象很大,所以最好避免将它们复制到每个进程中。这些方法也被认为是非常占用CPU的,因此应该使用多个进程而不是线程。在这种情况下,我认为没有解决方案,因为多处理不能共享内存,线程不能使用多个CPU。不过,我希望被证明是错的。

下面是一个使用Pool.map:的工作示例

import multiprocessing
class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
def __str__(self):
return str(self.a)
def worker(obj):
obj.aplus()
return obj
if __name__ == "__main__":
object_list = [Object(1), Object(2), Object(3)]
try:
processes = multiprocessing.cpu_count()
except NotImplementedError:
processes = 2
pool = multiprocessing.Pool(processes=processes)
modified_object_list = pool.map(worker, object_list)
for obj in modified_object_list:
print(obj)

打印:

2
3
4

这是我的答案,使用threading:

from threading import Thread
class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
object_list = [Object(1), Object(2), Object(3)]
# A list containing all threads we will create
threads = []
# Create a thread for every objects
for obj in object_list:
thread = Thread(target=obj.aplus)
thread.daemon = True
thread.start()
threads.append(thread)
# Wait for all threads to finish before continuing
for thread in threads:
thread.join();
# prints results
for obj in object_list:
print(obj.a)

我认为这是因为对象只能被复制,而不能访问,具有多处理功能。

这是完全正确的,也是答案的一半。因为进程是孤立的,所以它们每个都有自己的object_list副本。这里的一个解决方案是使用ThreadPoolExecutor(线程都共享相同的object_list(。

使用它的语法与您尝试使用的有点不同,但它可以按预期工作:

executor = ThreadPoolExecutor(max_workers=3)
res = executor.map(Object.aplus, object_list)

如果您真的想使用ProcessPoolExecutor,那么您需要以某种方式从进程中获取数据。最简单的方法是使用返回值的函数:

from concurrent.futures import ProcessPoolExecutor

class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
return self.a

if __name__ == '__main__':
object_list = [Object(1), Object(2), Object(3)]
executor = ProcessPoolExecutor(max_workers=3)
for result in executor.map(Object.aplus, object_list):
print("I got: " + str(result))

您甚至可以使用map返回self的函数,然后将返回的对象放回object_list中。因此,完整的多处理解决方案看起来像:

from concurrent.futures import ProcessPoolExecutor

class Object:
def __init__(self, a):
self.a = a
def aplus(self):
self.a += 1
return self

if __name__ == '__main__':
object_list = [Object(1), Object(2), Object(3)]
executor = ProcessPoolExecutor(max_workers=3)
object_list = list(executor.map(Object.aplus, object_list))

最新更新