我正在尝试实现一种高度并行化的在线递归并行算法。我的问题是我的 python 实现没有按照我想要的方式工作。我有两个 2D 矩阵,每次在时间步长 t 观察到新的观察值时,我想递归地更新每一列。 我的并行代码是这样的
def apply_async(t):
worker = mp.Pool(processes = 4)
for i in range(4):
X[:,i,np.newaxis], b[:,i,np.newaxis] = worker.apply_async(OULtraining, args=(train[t,i], X[:,i,np.newaxis], b[:,i,np.newaxis])).get()
worker.close()
worker.join()
for t in range(p,T):
count = 0
for l in range(p):
for k in range(4):
gn[count]=train[t-l-1,k]
count+=1
G = G*v + gn @ gn.T
Gt = (1/(t-p+1))*G
if __name__ == '__main__':
apply_async(t)
两个矩阵是 X 和 b。我想直接替换主内存,因为每个进程只递归更新矩阵的一个特定列。
为什么此实现比顺序实现慢?
有没有办法在每个时间步恢复该过程,而不是杀死它们并再次创建它们?这可能是它变慢的原因吗?
原因是,你的程序实际上是顺序的。 这是一个示例代码片段,从并行性的角度来看,它与您的代码片段相同:
from multiprocessing import Pool
from time import sleep
def gwork( qq):
print (qq)
sleep(1)
return 42
p = Pool(processes=4)
for q in range(1, 10):
p.apply_async(gwork, args=(q,)).get()
p.close()
p.join()
运行此按钮,您会注意到数字 1-9 每秒只出现一次。这是为什么呢? 原因是你的.get()
.这意味着对apply_async的每次调用实际上都会阻止get()
,直到结果可用。 它将提交一个任务,等待第二个模拟处理延迟,然后返回结果,之后另一个任务将提交到池中。 这意味着根本没有正在进行的并行执行。
尝试将池管理部分替换为以下内容:
results = []
for q in range(1, 10):
res = p.apply_async(gwork, args=(q,))
results.append(res)
p.close()
p.join()
for r in results:
print (r.get())
现在,您可以看到工作中的并行性,因为现在可以同时处理四个任务。您的循环不会在 get 中阻塞,因为 get 被移出循环,并且只有在结果准备就绪时才会收到结果。
注意:如果你对工作线程的参数或它们的返回值是大数据结构,你将失去一些性能。在实践中,Python 将这些实现为队列,与分叉子进程时获取数据结构的内存副本相比,通过队列传输大量数据在相对方面很慢。
我一直在攻击汉努的代码时遇到问题:
results = []
for q in range(1, 10):
res = p.apply_async(gwork, args=(q,))
results.append(res)
p.close()
p.join()
for r in results:
print (r.get())
问题是,当循环命中第一个 r.get() 时出现异常,整个程序都会退出,因为它没有得到正确处理。 我已经看到这种方法以几乎相同的方式发布了很多次,但总是导致相同的问题。
我最终将 r.get() 包装在一个 TRY/EXCEPT 块中,这允许程序处理列表中的所有异常并按设计继续。
from multiprocessing.pool import Pool
import traceback
results = []
pool = Pool(32)
process_schedule_data = pool.apply_async(TSMDataProcessor().process_schedule_data, args=("Schedule",))
# a bunch more calls like the one above but to different methods of the same class
pool.close()
pool.join()
for r in results:
try:
r.get()
except BaseException:
logger.error(f"data processor exception: {traceback.format_exc()}")