如何使用多处理在 for 循环中并行化对同一函数的两个调用,具有不同的参数?



在for循环中,我调用一个函数两次,但使用不同的参数集(argSet1argSet2),这些参数集在for循环的每次迭代中都会改变。我想并行化此操作,因为一组参数导致被调用函数运行得更快,而另一组参数导致函数运行缓慢。请注意,我不希望此操作有两个 for 循环。我还有另一个要求:这些函数中的每一个都将执行一些并行操作,因此我不希望任何具有argSet1argSet2的函数运行多次,因为我拥有的计算资源有限。确保具有两个参数集的函数正在运行将有助于我尽可能多地利用 CPU 内核。以下是在没有并行化的情况下通常如何执行此操作:

def myFunc(arg1, arg2):
if arg1:
print ('do something that does not take too long')
else:
print ('do something that takes long')
for i in range(10):
argSet1 = arg1Storage[i]
argSet1 = arg2Storage[i]
myFunc(argSet1)
myFunc(argSet2)

这绝对不会利用我拥有的计算资源。这是我尝试并行化操作的方法:

from multiprocessing import Process
def myFunc(arg1, arg2):
if arg1:
print ('do something that does not take too long')
else:
print ('do something that takes long')
for i in range(10):
argSet1 = arg1Storage[i]
argSet1 = arg2Storage[i]
p1 = Process(target=myFunc, args=argSet1)
p1.start()
p2 = Process(target=myFunc, args=argSet2)
p2.start()

但是,这样每个函数及其各自的参数将被调用 10 次,并且事情变得非常慢。鉴于我对多处理的了解有限,我试图通过在 for 循环的末尾添加p1.join()p2.join()来进一步改进事情,但这仍然会导致速度变慢,因为p1完成得更快,事情等到p2完成。我还考虑过使用multiprocessing.Value与函数进行一些通信,但随后我必须在函数内为每个函数调用添加一个while循环,这会再次减慢一切。我想知道是否有人可以提供实用的解决方案?

由于我在补丁中构建了这个答案,因此向下滚动以获取此问题的最佳解决方案

您需要准确指定您希望如何运行。据我所知,您最多希望运行两个进程,但至少也希望运行。此外,您不希望沉重的呼叫阻碍快速呼叫。一种简单的非最佳运行方式是:

from multiprocessing import Process
def func(counter,somearg):
j = 0
for i in range(counter): j+=i
print(somearg)
def loop(counter,arglist):
for i in range(10):
func(counter,arglist[i])
heavy = Process(target=loop,args=[1000000,['heavy'+str(i) for i in range(10)]])
light = Process(target=loop,args=[500000,['light'+str(i) for i in range(10)]])
heavy.start()
light.start()
heavy.join()
light.join()

此处的输出是(对于一个示例运行):

light0
heavy0
light1
light2
heavy1
light3
light4
heavy2
light5
light6
heavy3
light7
light8
heavy4
light9
heavy5
heavy6
heavy7
heavy8
heavy9

您可以看到最后一部分是次优的,因为您有一系列繁重的运行 - 这意味着有一个过程而不是两个。

优化这一点的简单方法,如果您可以估计繁重的进程运行多长时间。如果它的速度是这里的两倍,只需先运行 7 次 heavy 迭代,加入轻量级进程,然后让它运行额外的 3 次。

另一种方法是成对运行繁重的进程,因此首先你有 3 个进程,直到快速进程结束,然后继续 2 个。

重点是将重调用和轻调用完全分离到另一个进程 - 因此,当快速调用一个接一个地快速完成时,您可以处理慢速内容。一旦斋戒结束,这取决于你想继续多详细,但我认为现在估计如何打破繁重的电话已经足够了。这是我的例子:

from multiprocessing import Process
def func(counter,somearg):
j = 0
for i in range(counter): j+=i
print(somearg)
def loop(counter,amount,arglist):
for i in range(amount):
func(counter,arglist[i])
heavy1 = Process(target=loop,args=[1000000,7,['heavy1'+str(i) for i in range(7)]])
light = Process(target=loop,args=[500000,10,['light'+str(i) for i in range(10)]])
heavy2 = Process(target=loop,args=[1000000,3,['heavy2'+str(i) for i in range(7,10)]])
heavy1.start()
light.start()
light.join()
heavy2.start()
heavy1.join()
heavy2.join()

带输出:

light0
heavy10
light1
light2
heavy11
light3
light4
heavy12
light5
light6
heavy13
light7
light8
heavy14
light9
heavy15
heavy27
heavy16
heavy28
heavy29

利用率要好得多。当然,您可以通过为慢速进程运行共享队列来使其更高级,因此当快速运行时,他们可以作为慢速队列上的工作线程加入,但对于只有两个不同的调用,这可能是矫枉过正(尽管使用队列并不难)。最佳解决方案

from multiprocessing import Queue,Process
import queue
def func(index,counter,somearg):
j = 0
for i in range(counter): j+=i
print("Worker",index,':',somearg)
def worker(index):
try:
while True:
func,args = q.get(block=False)
func(index,*args)
except queue.Empty: pass
q = Queue()
for i in range(10):
q.put((func,(500000,'light'+str(i))))
q.put((func,(1000000,'heavy'+str(i))))
nworkers = 2
workers = []
for i in range(nworkers):
workers.append(Process(target=worker,args=(i,)))
workers[-1].start()
q.close()
for worker in workers:
worker.join()

这是您想要的最佳和最具可扩展性的解决方案。输出:

Worker 0 : light0
Worker 0 : light1
Worker 1 : heavy0
Worker 1 : light2
Worker 0 : heavy1
Worker 0 : light3
Worker 1 : heavy2
Worker 1 : light4
Worker 0 : heavy3
Worker 0 : light5
Worker 1 : heavy4
Worker 1 : light6
Worker 0 : heavy5
Worker 0 : light7
Worker 1 : heavy6
Worker 1 : light8
Worker 0 : heavy7
Worker 0 : light9
Worker 1 : heavy8
Worker 0 : heavy9

您可能希望使用multiprocessing.Pool流程并将myFunc映射到其中,如下所示:

from multiprocessing import Pool
import time
def myFunc(arg1, arg2):
if arg1:
print ('do something that does not take too long')
time.sleep(0.01)
else:
print ('do something that takes long')
time.sleep(1)
def wrap(args):
return myFunc(*args)
if __name__ == "__main__":
p = Pool()
argStorage = [(True, False), (False, True)] * 12
p.map(wrap, argStorage)

我添加了一个wrap函数,因为传递给p.map的函数必须接受单个参数。您也可以调整myFunc接受元组,如果这可能在您的情况下。

我的样本appStorage24 个项目的常量,其中 12 个项目需要 1 秒来处理,12 个项目将在 10 毫秒内完成。总的来说,这个脚本在 3-4 秒内运行(我有 4 个内核)。

一种可能的实现方式如下:

import concurrent.futures
import math
list_of_args = [arg1, arg2]
def my_func(arg):
....
print ('do something that takes long')
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for arg, result in zip(list_of_args, executor.map(is_prime, list_of_args)):
print('my_func({0}) => {1}'.format(arg, result))

executor.map与内置函数类似,map 方法允许对提供的函数进行多次调用,将可迭代对象中的每个项传递给该函数。

相关内容

  • 没有找到相关文章

最新更新