我正在处理一个需要运行两个不同的CPU密集型函数的项目。因此,使用多进程方法似乎是要走的路。我面临的挑战是一个函数的运行时间比另一个函数慢。为了便于讨论,假设execute
的运行时间为 .1 秒,而update
运行需要整整一秒。目标是在update
运行时,execute
将计算输出值 10 次。完成后update
需要将一组参数传递给execute
然后可以继续使用新参数集生成输出。一段时间后update
需要再次运行并再次生成一组新的参数。
此外,这两个函数都需要一组不同的输入变量。
下面的图像链接应该希望能更好地可视化我的难题。
函数运行时可视化
从我收集到的信息(https://zetcode.com/python/multiprocessing/)来看,使用非对称映射方法可能是要走的路,但它似乎并不真正有效。任何帮助将不胜感激。
伪代码
from multiprocessing import Pool
from datetime import datetime
import time
import numpy as np
class MyClass():
def __init__(self, inital_parameter_1, inital_parameter_2):
self.parameter_1 = inital_parameter_1
self.parameter_2 = inital_parameter_2
def execute(self, input_1, input_2, time_in):
print('starting execute function for time:' + str(time_in))
time.sleep(0.1) # wait for 100 milliseconds
# generate some output
output = (self.parameter_1 * input_1) + (self.parameter_2 + input_2)
print('exiting execute function')
return output
def update(self, update_input_1, update_input_2, time_in):
print('starting update function for time:' + str(time_in))
time.sleep(1) # wait for 1 second
# generate parameters
self.parameter_1 += update_input_1
self.parameter_2 += update_input_2
print('exiting update function')
def smap(f):
return f()
if __name__ == "__main__":
update_input_1 = 3
update_input_2 = 4
input_1 = 0
input_2 = 1
# initialize class
my_class = MyClass(1, 2)
# total runtime (arbitrary)
runtime = int(10e6)
# update_time (arbitrary)
update_time = np.array([10, 10e2, 15e4, 20e5])
for current_time in range(runtime):
# if time equals update time run both functions simultanously until update is complete
if any(update_time == current_time):
with Pool() as pool:
res = pool.map_async(my_class.smap, [my_class.execute(input_1, input_2, current_time),
my_class.update(update_input_1, update_input_2, current_time)])
# otherwise run only execute
else:
output = my_class.execute(input_1, input_2,current_time)
# increment input
input_1 += 1
input_2 += 2
我承认无法完全遵循您的代码而不是您的描述。但我看到一些问题:
- 方法
update
不返回除None
以外的任何值,由于缺少return
语句,该值被隐式返回。 - 您的
with Pool() ...:
块将在块退出时调用terminate
,这是在您调用pool.map_async
之后立即调用的,这是非阻塞的。但是您没有等待此提交任务完成的规定(terminate
很可能会在正在运行的任务完成之前将其终止)。 - 传递给
map_async
函数的是工作器函数名称和可迭代对象。但是您正在调用方法调用以从当前主进程中execute
和update
,并将其返回值用作可迭代对象的元素,这些返回值绝对不适合传递给smap
的函数。所以没有进行多处理,这是完全错误的。 - 您还在一遍又一遍地创建和销毁进程池。最好只创建一次进程池。
因此,我建议至少进行以下更改。但请注意,此代码生成任务的速度可能比完成任务快得多,并且根据当前runtime
值,您可能会有数百万个任务排队等待运行,这可能会对内存等系统资源造成相当大的压力。因此,我插入了一些代码,以确保限制提交任务的速率,以便未完成的提交任务数永远不会超过可用 CPU 内核数的三倍。
# we won't need heavy-duty numpy for what we are doing:
#import numpy as np
from multiprocessing import cpu_count
from threading import Lock
... # etc.
if __name__ == "__main__":
update_input_1 = 3
update_input_2 = 4
input_1 = 0
input_2 = 1
# initialize class
my_class = MyClass(1, 2)
# total runtime (arbitrary)
runtime = int(10e6)
# update_time (arbitrary)
# we don't need overhead of numpy (remove import of numpy):
#update_time = np.array([10, 10e2, 15e4, 20e5])
update_time = [10, 10e2, 15e4, 20e5]
tasks_submitted = 0
lock = Lock()
execute_output = []
def execute_result(result):
global tasks_submitted
with lock:
tasks_submitted -= 1
# result is the return value from method execute
# do something with it, e.g. execute_output.append(result)
pass
update_output = []
def update_result(result):
global tasks_submitted
with lock:
tasks_submitted -= 1
# result is the return value from method update
# do something with it, e.g. update_output.append(result)
pass
n_processors = cpu_count()
with Pool() as pool:
for current_time in range(runtime):
# if time equals update time run both functions simultanously until update is complete
#if any(update_time == current_time):
if current_time in update_time:
# run both update and execute:
pool.apply_async(my_class.update, args=(update_input_1, update_input_2, current_time), callback=update_result)
with lock:
tasks_submitted += 1
pool.apply_async(my_class.execute, args=(input_1, input_2, current_time), callback=execute_result)
with lock:
tasks_submitted += 1
# increment input
input_1 += 1
input_2 += 2
while tasks_submitted > n_processors * 3:
time.sleep(.05)
# Ensure all tasks have completed:
pool.close()
pool.join()
assert(tasks_submitted == 0)