我已经有了要切换到多进程的原始功能:
def optimal(t0, tf, frequences, delay, ratio = 0):
First = True # First
for s in delay:
delay = 0 # delay between signals,
timelines = list()
for i in range(len(frequences)):
timelines.append(time_builder(frequences[i], t0+delay, tf))
delay += s
trio_overlap = trio_combination(timelines, ratio)
valid = True
for items in trio_overlap.values():
if len(list(set(items))) == len(items):
continue
else:
valid = False
if not valid:
continue
overlap = duo_combination(timelines)
optimal = ... depending of conditions
return optimal
如果valid = True
在测试后,它将计算一个称为optim_param
的优化参数,并尝试将其最小化。如果它达到一定的阈值,optim_param < 0.3
,我会脱离循环并将这个值作为我的答案。
我的问题是,随着我开发模型,复杂性开始上升,单线计算花费太长。我想并行处理计算。由于每个过程都必须将带有S值的结果与当前的最优值进行比较,因此我试图实现队列。
这是我第一次进行多处理,即使我认为自己在正确的轨道上,我也觉得我的代码凌乱而不完整。我可以得到一些帮助吗?
谢谢:D
而不是为每种情况创建一个过程,而是考虑使用Pool.imap_unordered
。诀窍是如何在获得可传递结果时清洁关闭:您可以通过通过设置标志以检查每个循环的标志,通过传递早期退出的发电机来实现此功能。主要程序从迭代器中读取,保持最佳的结果,并在足够好的情况下设置标志。最后的技巧是减慢发电机的(内部(线程读数,以防止在获得良好结果后必须等待(或不清洁的(预定任务的大量积压。鉴于池中的过程数量,可以通过信号量来实现起搏。
这是一个示例(带有琐碎的分析(,以证明:
import multiprocessing,threading,os
def interrupted(data,sem,interrupt):
for x in data:
yield x
sem.acquire()
if interrupt: break
def analyze(x): return x**2
np=os.cpu_count()
pool=multiprocessing.Pool(np)
sem=threading.Semaphore(np-1)
token=[] # mutable
vals=pool.imap_unordered(analyze,interrupted(range(-10,10),sem,token))
pool.close() # optional: to let processes exit faster
best=None
for res in vals:
if best is None or res<best:
best=res
if best<5: token.append(None) # make it truthy
sem.release()
pool.join()
print(best)
当然还有其他方法可以与发电机共享信号量和中断标志;这样,使用丑陋的数据类型,但具有不使用全局变量(甚至封闭(的优点。