有没有办法在可用时累积pool.apply_async调用的结果,而无需将它们收集在类似列表的结构中?



在循环中,我使用 python pool.apply_async(( 调用发送作业。

下面的python代码最初是由我编写的,然后由@Santiago Magariños编辑

import multiprocessing
import numpy as np
from time import time, sleep
from random import random
chrNames=['chr1','chr2','chr3']
sims=[1,2,3]

def accumulate_chrBased_simBased_result(chrBased_simBased_result,accumulatedSignalArray,accumulatedCountArray):    
signalArray = chrBased_simBased_result[0]
countArray = chrBased_simBased_result[1]
accumulatedSignalArray += signalArray
accumulatedCountArray += countArray

def func(chrName,simNum):
result=[]
sleep(random()*5)
signal_array=np.full((10000,), simNum, dtype=float)
count_array = np.full((10000,), simNum, dtype=int)
result.append(signal_array)
result.append(count_array)
print('%s %d' %(chrName,simNum))
return result

if __name__ == '__main__':
accumulatedSignalArray = np.zeros((10000,), dtype=float)
accumulatedCountArray = np.zeros((10000,), dtype=int)
numofProcesses = multiprocessing.cpu_count()
pool = multiprocessing.Pool(numofProcesses)
results = []
for chrName in chrNames:
for simNum in sims:
results.append(pool.apply_async(func, (chrName,simNum,)))
for i in results:
print(i)
while results:
for r in results[:]:
if r.ready():
print('{} is ready'.format(r))
accumulate_chrBased_simBased_result(r.get(),accumulatedSignalArray,accumulatedCountArray)
results.remove(r)
pool.close()
pool.join()
print(accumulatedSignalArray)
print(accumulatedCountArray)

有没有办法在 pool.apply_async(( 调用的结果可用时累积它,而无需将它们收集在类似结构的列表中?

这样的东西可以工作。几乎复制您的代码并添加回调,请注意,这是有效的,因为在我们访问累加器值之前,池已加入。如果没有,我们需要一些其他类型的同步机制

class Accumulator:
def __init__(self):
self.signal = np.zeros((10000,), dtype=float)
self.count = np.zeros((10000,), dtype=int)
def on_result(self, result):
self.signal += result[0]
self.count += result[1]
if __name__ == '__main__':
num_proc = multiprocessing.cpu_count()
pool = multiprocessing.Pool(num_proc)
accumulator = Accumulator()
for chrName in chrNames:
for simNum in sims:
pool.apply_async(func, (chrName,simNum,), callback=accumulator.on_result)
pool.close()
pool.join()
print(accumulator.signal)
print(accumulator.count)

相关内容

  • 没有找到相关文章

最新更新