我有以下功能:
def function(arguments, process_number):
calclulations that build vector_1 and vector_2 of size 100000 each
return vector_1, vector_2
对于process_number的200个不同值,我需要并行运行此函数。我对按process_number排序输出不感兴趣,所以我使用apply_async((,如下所示:
def parallelizing():
number_of_process
pool = mp.Pool(mp.cpu_count())
size = 100000
vector_1 = np.zeros(size)
vector_2 = np.zeros(size)
returned_object = [pool.apply_async(function,args=(args, procnum)) for procnum in range(number_of_process)]
pool.close()
pool.join()
for r in returned_object:
vector_1 += r.get()[0]
vector_2 += r.get()[1]
所以,在一天结束的时候,我只需要把我从并行化函数中得到的所有向量加在一起。
问题是,该过程使用的大部分时间实际上是存储内存来构建[returned_object]列表,我认为这真的没有必要,因为我只需要添加函数的返回,然后忘记它。
因此,我试图避免构建一个庞大的对象列表,每个对象至少包含两个100000大小的浮动向量,而是直接进入"添加"步骤。
有办法吗?我应该定义一个全局变量并在函数中对其进行写入吗?我担心这可能会导致并发,并把事情搞砸。正如我所说,我真的不在乎得到一个有序的结果,因为我只需要把事情加起来。
从下面的Cireo答案编辑:
好的,只是为了确认我是否理解。我不使用apply_async方法,而是做如下操作:
def function(args, queue_1, queue_2):
#do calculation
queue.put(vector_1)
queue.put(vector_2)
在调用并行化的函数中,我只做
def parallelizing():
queue1 = Queue()
queue2=Queue()
processid = np.arange(200)
p = Process(target=f, args=(args,queue1,queue2,prcoessid))
p.start()
p.join()
我真正不明白的是,我如何添加函数的返回,而不是把事情放在队列中,这在我看来像创建列表一样计算密集。如果我执行Queue.put((,我不会得到和以前相同的列表吗?
听起来你已经自我诊断出这个问题与相同
共享多处理中的列表列表
根本不创建列表,只需使用Queue或类似方法将所有子流程的结果发送回主流程即可。你说过你不在乎秩序,所以这应该很简单。
您还可以研究使用共享内存,或者只是在进程之间共享对象(尽管第二种选择可能不够快(
https://docs.python.org/3/library/multiprocessing.shared_memory.html
我不太熟悉multiprocessing
模块,但我熟悉threading
,我不确定哪一个性能更好,但这就是我线程的方式:
import numpy as np
from threading import Thread
def function(arguments, process_number, results, index):
print(arguments, process_number)
#calclulations that build vector_1 and vector_2 of size 100000 each
# example arrays/vectors
vector_1 = np.zeros(10000)
vector_2 = np.zeros(10000)
results[index] = (vector_1, vector_2)
count = 200
threads = [None] * count
results = [None] * count
args = ("example", "arguments")
process_numbers = range(1, count + 1)
for process_number, i in zip(process_numbers, range(count)):
threads[i] = Thread(target=function, args=(args,process_number, results, i))
threads[i].start()
for i in range(count):
threads[i].join()
size = 10000
vector_1 = np.zeros(size)
vector_2 = np.zeros(size)
for result in results:
vector_1 += result[0]
vector_2 += result[1]