我有一个函数,我将使用多处理来运行它。然而,函数返回一个值,我不知道一旦完成,如何存储该值。
我在网上读到过关于使用队列的文章,但我不知道如何实现它,也不知道它是否有效。
cores = []
for i in range(os.cpu_count()):
cores.append(Process(target=processImages, args=(dataSets[i],)))
for core in cores:
core.start()
for core in cores:
core.join()
其中函数"processImages"返回一个值。如何保存返回的值?
在代码片段中,您输入了数据集,这是一个未指定大小的列表。您有一个函数processImages,它接受一个dataSet元素,并显然返回您想要捕获的值。
cpucount==数据集长度
我注意到的第一个问题是os.cpu_count((驱动I的值范围,然后决定处理哪些数据集。我想你会更喜欢这两件事独立。也就是说,你希望能够处理一些X数量的数据集,并且你希望它能在任何机器上工作,具有1-1000(或更多…(个内核。
关于CPU绑定工作的旁白
我还将假设您已经确定任务实际上是CPU绑定的,因此按核心划分是有意义的。相反,如果您的任务是磁盘io绑定的,则需要更多的工作人员。您也可以是内存绑定或缓存绑定。如果最佳并行化对您来说很重要,那么您应该考虑进行一些试验,看看哪种数量的工作程序真正能给您带来最大的性能。
如果你喜欢,这里有更多阅读
Pool类
无论如何,正如Michael Butscher所提到的,Pool类为您简化了这一点。您的是一个标准用例。你有一组工作要做(你要处理的数据集列表(,还有很多工作人员要做(在你的代码片段中,你的核心数量(。
TLDR
使用像这样简单的多处理概念:
from multiprocessing import Pool
# Renaming this variable just for clarity of the example here
work_queue = datasets
# This is the number you might want to find experimentally. Or just run with cpu_count()
worker_count = os.cpu_count()
# This will create processes (fork) and join all for you behind the scenes
worker_pool = Pool(worker_count)
# Farm out the work, gather the results. Does not care whether dataset count equals cpu count
processed_work = worker_pool.map(processImages, work_queue)
# Do something with the result
print(processed_work)
不能从其他进程返回变量。建议的方法是创建一个Queue
(multiprocessing.Queue(,然后让您的子流程将结果放入该队列,完成后,您可以读回它们——如果您有很多结果,这是有效的。
如果你只需要一个数字——使用Value
或Array
可能会更容易。
请记住,不能使用简单的变量,它必须用multiprocessing
lib中的上述类包装。
如果您想使用多处理返回的结果object
,请尝试此
from multiprocessing.pool import ThreadPool
def fun(fun_argument1, ... , fun_argumentn):
<blabla>
return object_1, object_2
pool = ThreadPool(processes=number_of_your_process)
async_num1 = pool.apply_async(fun, (fun_argument1, ... , fun_argumentn))
object_1, object_2 = async_num1.get()
然后你可以做任何你想做的事。