与共享队列和结束标准进行多处理



我已经有了要切换到多进程的原始功能:

def optimal(t0, tf, frequences, delay, ratio = 0):
    First = True                            # First        
    for s in delay:
        delay = 0                           # delay between signals,
        timelines = list()
        for i in range(len(frequences)):
            timelines.append(time_builder(frequences[i], t0+delay, tf))
            delay += s
       trio_overlap = trio_combination(timelines, ratio)
        valid = True
        for items in trio_overlap.values():
            if len(list(set(items))) == len(items):
                continue
            else:
                valid = False
        if not valid:
            continue
        overlap = duo_combination(timelines)
    optimal = ... depending of conditions        
    return optimal

如果valid = True在测试后,它将计算一个称为optim_param的优化参数,并尝试将其最小化。如果它达到一定的阈值,optim_param < 0.3,我会脱离循环并将这个值作为我的答案。

我的问题是,随着我开发模型,复杂性开始上升,单线计算花费太长。我想并行处理计算。由于每个过程都必须将带有S值的结果与当前的最优值进行比较,因此我试图实现队列。

这是我第一次进行多处理,即使我认为自己在正确的轨道上,我也觉得我的代码凌乱而不完整。我可以得到一些帮助吗?

谢谢:D

而不是为每种情况创建一个过程,而是考虑使用Pool.imap_unordered。诀窍是如何在获得可传递结果时清洁关闭:您可以通过通过设置标志以检查每个循环的标志,通过传递早期退出的发电机来实现此功能。主要程序从迭代器中读取,保持最佳的结果,并在足够好的情况下设置标志。最后的技巧是减慢发电机的(内部(线程读数,以防止在获得良好结果后必须等待(或不清洁的(预定任务的大量积压。鉴于池中的过程数量,可以通过信号量来实现起搏。

这是一个示例(带有琐碎的分析(,以证明:

import multiprocessing,threading,os
def interrupted(data,sem,interrupt):
  for x in data:
    yield x
    sem.acquire()
    if interrupt: break
def analyze(x): return x**2
np=os.cpu_count()
pool=multiprocessing.Pool(np)
sem=threading.Semaphore(np-1)
token=[]                        # mutable
vals=pool.imap_unordered(analyze,interrupted(range(-10,10),sem,token))
pool.close()                    # optional: to let processes exit faster
best=None
for res in vals:
  if best is None or res<best:
    best=res
    if best<5: token.append(None) # make it truthy
  sem.release()
pool.join()
print(best)

当然还有其他方法可以与发电机共享信号量和中断标志;这样,使用丑陋的数据类型,但具有不使用全局变量(甚至封闭(的优点。

相关内容

  • 没有找到相关文章

最新更新