python中具有条件进程生成的多处理



我有一个单线程函数,我想并行化。代码有点太复杂了,无法向您展示,但这里是其行为的建模

R = list(range(4))
def compute(val):
res = sum(val)
if res%2 == 0: #first condition on res
sleep(0.5) #expansive operation
if res%4 == 0: #second condition on res
sleep(0.5) #expansive operation
return 2
else:
return 1
else:
return 1
def f_single(idx, val):
if idx == len(R):
return 1
else:
val = val + [R[idx]]
ret = compute(val)
if ret == 1:
return f_single(idx+1, val)
else:
#DISJUNCT
return f_single(idx+1, val) + f_single(idx+1, val)

基本上,每个递归执行变量val的更新,并且我可能需要根据compute(val)的结果在某些条件下执行double调用,这是一个扩展的计算。(旁注:这个实现不能扩展到大列表,因为我将StackOverflow相当快;多处理工作也是重写此代码的借口)。

理想情况下,我想生成一个新的进程来计算对f_single的新调用。

我开始这样重构代码:

# list of indices
def process_idxs():
return list(range(len(R)))
def are_two_path(idx, val):
val = val + [R[idx]]
ret = compute(val)
if ret == 1:
return False #simulate a "only one path"
return True #simulate a "two path available"
if __name__ == '__main__':
ret = f_single(0,[])
now = time()
idxs = process_idxs()
# start a job when the job queue is not full
# when the job is complete, return the results (solvers with call stacks)
# add the new results to the job queue
# program terminates when the job queue is done
# TODO: how to do this?
with ProcessPoolExecutor(max_workers=12) as executor:
for idx in idxs:
f = executor.submit(are_two_path, idx, val)
print(f.result())
print("multi proc: ", time()-now, "s")

我不知道如何编写并行化例程以获得与f_single相同的返回值(最后几行是尝试这样做)。

查看并发。我没有找到一种简单的方法来收集当前索引的计算结果,有条件地生成进程并执行下一个递归,同时传递val的更新值。

我没有任何共享状态,除了R是只读的,所以这应该不是一个问题。

关于如何将f_single转换为多处理函数,您有任何建议或指南吗?

一种可能的方法是这样做:

import os
from time import time, sleep
from multiprocessing import Queue, Process
from queue import Empty
R = list(range(16))
NUMBER_OF_PROCESSES = 32
TIMEOUT = 1
def compute(val):
res = sum(val)
if res%2 == 0: #first condition on res
sleep(1) #expansive operation
if res%4 == 0: #second condition on res
sleep(1) #expansive operation
return 2
else:
return 1
else:
return 1
def are_two_path(idx, val):
val = val + [R[idx]]
ret = compute(val)
if ret == 1:
return False #simulate a "only one path"
return True #simulate a "two path available"
def worker(q, r, start_val, start_idx):
"""Worker spawned in a new process, in charge of
going through the list iteratively.
Sends a new job to the tasks queue if two path are available
"""
val = start_val
for idx in range(start_idx, len(R)+1):
if idx == len(R):
r.put(1)
else:
result = are_two_path(idx, val)
if result:
q.put((idx+1, val+[R[idx]]))
val = val + [R[idx]]
def overseer():
"""Running in the initial process,
this function create tasks and results queues,
maintain the number of current running processes
and spawn new processes when there is enough room
"""
tasks = Queue()
results = Queue()
init_p = Process(target=worker,
args=(tasks, results, [], 0))
init_p.start()
working = 1
completed_last_cycle = 0
while True:
completed_tasks = results.qsize()
if working < NUMBER_OF_PROCESSES:
# if there is enough room in the working queue,
# spawn a new process and add it
try:
(idx, val) = tasks.get(timeout=5)
except Empty:
break
p = Process(target=worker, args=(tasks, results, val, idx))
p.start()
working += 1
if completed_tasks > completed_last_cycle:
# if some processes terminated during last cycle,
# update the working counter
working -= (completed_tasks - completed_last_cycle)
completed_last_cycle = completed_tasks
tasks.close()
tasks.join_thread()
results.close()
results.join_thread()
return results
def test():
res = overseer()
print("Number of case splits: ", res.qsize())
if __name__ == '__main__':
now = time()
test()
print("multi proc: ", time()-now, "s")

相关内容

  • 没有找到相关文章

最新更新