python:如何为多进程应用回调函数



在我的GUI应用程序中,我想使用multiprocessing来加速计算。现在,我可以使用multiprocessing,并收集计算结果。现在,我希望子流程可以通知主流程计算已经完成,但我找不到任何解决方案。

我的multiprocessing看起来像:

import multiprocessing
from multiprocessing import Process
import numpy as np
class MyProcess(Process):
def __init__(self,name, array):
super(MyProcess,self).__init__()
self.name = name
self.array = array
recv_end, send_end = multiprocessing.Pipe(False)
self.recv = recv_end
self.send = send_end
def run(self):
s = 0
for a in self.array:
s += a
self.send.send(s)
def getResult(self):
return self.recv.recv()

if __name__ == '__main__':
process_list = []
for i in range(5):
a = np.random.random(10)
print(i, ' correct result: ', a.sum())
p = MyProcess(str(i), a)
p.start()
process_list.append(p)
for p in process_list:
p.join()
for p in process_list:
print(p.name, ' subprocess result: ', p.getResult())

我希望子进程可以通知主进程计算已经完成,这样我就可以在GUI中显示结果。

欢迎任何建议~~~

假设您想在生成结果(在您的情况下是numpy数组的和(后立即对其进行处理,那么我将使用方法multiprocessing.pool.Pool和方法imap_unordered的多处理池,它将按生成的顺序返回结果。在这种情况下,您需要将要处理的数组列表中的数组索引与数组本身一起传递给辅助函数,并让它将该索引与数组的和一起返回,因为这是主进程知道为哪个数组生成了和的唯一方法:

from multiprocessing import Pool, cpu_count
import numpy as np
def compute_sum(tpl):
# unpack tuple:
i, array = tpl
s = 0
for a in array:
s += a
return i, s
if __name__ == '__main__':
array_list = [np.random.random(10) for _ in range(5)]
n = len(array_list)
pool_size = min(cpu_count(), n)
pool = Pool(pool_size)
# get result as soon as it has been returned:
for i, s in pool.imap_unordered(compute_sum, zip(range(n), array_list)):
print(f'correct result {i}: {array_list[i].sum()}, actual result: {s}')
pool.close()
pool.join()

打印:

correct result 0: 4.760033809335711, actual result: 4.76003380933571
correct result 1: 5.486818812843256, actual result: 5.486818812843257
correct result 2: 5.400374562564179, actual result: 5.400374562564179
correct result 3: 4.079376706247242, actual result: 4.079376706247242
correct result 4: 4.20860716467263, actual result: 4.20860716467263

在上面的运行中,生成的实际结果恰好与提交任务的顺序相同。为了证明通常情况下,根据工作函数计算结果所需的时间,结果可以按任意顺序生成,我们在处理时间中引入了一些随机性:

from multiprocessing import Pool, cpu_count
import numpy as np
def compute_sum(tpl):
import time
# unpack tuple:
i, array = tpl
# results will be generated in random order:
time.sleep(np.random.sample())
s = 0
for a in array:
s += a
return i, s
if __name__ == '__main__':
array_list = [np.random.random(10) for _ in range(5)]
n = len(array_list)
pool_size = min(cpu_count(), n)
pool = Pool(pool_size)
# get result as soon as it has been returned:
for i, s in pool.imap_unordered(compute_sum, zip(range(n), array_list)):
print(f'correct result {i}: {array_list[i].sum()}, actual result: {s}')
pool.close()
pool.join()

打印:

correct result 4: 6.662288433360379, actual result: 6.66228843336038
correct result 0: 3.352901187256162, actual result: 3.3529011872561614
correct result 3: 5.836344458981557, actual result: 5.836344458981557
correct result 2: 2.9950208717729656, actual result: 2.9950208717729656
correct result 1: 5.144743159869513, actual result: 5.144743159869513

如果您对在任务提交而不是任务完成顺序中返回结果感到满意,则使用方法imap,无需来回传递数组索引:

from multiprocessing import Pool, cpu_count
import numpy as np
def compute_sum(array):
s = 0
for a in array:
s += a
return s
if __name__ == '__main__':
array_list = [np.random.random(10) for _ in range(5)]
n = len(array_list)
pool_size = min(cpu_count(), n)
pool = Pool(pool_size)
for i, s in enumerate(pool.imap(compute_sum, array_list)):
print(f'correct result {i}: {array_list[i].sum()}, actual result: {s}')
pool.close()
pool.join()

打印:

correct result 0: 4.841913985702773, actual result: 4.841913985702773
correct result 1: 4.836923014762733, actual result: 4.836923014762733
correct result 2: 4.91242274200897, actual result: 4.91242274200897
correct result 3: 4.701913574838348, actual result: 4.701913574838349
correct result 4: 5.813666896917504, actual result: 5.813666896917503

更新

您还可以使用方法apply_async指定从工作函数compute_sum返回结果时要调用的回调函数。apply_async返回multiprocessing.pool.AsyncResult,其get方法将阻塞直到任务完成,并返回已完成任务的返回值。但在这里,由于我们使用的是回调函数,该函数将在任务完成时自动调用,而不是调用方法multiprocessing.pool.AsyncResult.get,因此无需保存AsyncResult实例。我们还依赖于调用方法multiprocessing.pool.Pool.close()multiprocessing.pool.Pool.join()来阻止,直到所有提交的任务都完成并返回结果:

from multiprocessing import Pool, cpu_count
import numpy as np
from functools import partial
def compute_sum(i, array):
s = 0
for a in array:
s += a
return i, s
def calculation_display(result, t):
# Unpack returned tuple:
i, s = t
print(f'correct result {i}: {array_list[i].sum()}, actual result: {s}')
result[i] = s
if __name__ == '__main__':
global array_list
array_list = [np.random.random(10) for _ in range(5)]
n = len(array_list)
result = [0] * n
pool_size = min(cpu_count(), n)
pool = Pool(pool_size)
# Get result as soon as it has been returned.
# Pass to our callback as the first argument the results list.
# The return value will now be the second argument:
my_callback = partial(calculation_display, result)
for i, array in enumerate(array_list):
pool.apply_async(compute_sum, args=(i, array), callback=my_callback)
# Wait for all submitted tasks to complete:
pool.close()
pool.join()
print('results:', result)

打印:

correct result 0: 5.381579338696546, actual result: 5.381579338696546
correct result 1: 3.8780497856741274, actual result: 3.8780497856741274
correct result 2: 4.548733927791488, actual result: 4.548733927791488
correct result 3: 5.048921365623381, actual result: 5.048921365623381
correct result 4: 4.852415747983676, actual result: 4.852415747983676
results: [5.381579338696546, 3.8780497856741274, 4.548733927791488, 5.048921365623381, 4.852415747983676]

相关内容

  • 没有找到相关文章

最新更新