我正在尝试启动一个函数(my_function
(,并在达到一定时间后停止执行。所以我挑战了multiprocessing
库,一切都很好。这是代码,其中my_function()
已更改为仅创建一个伪消息。
from multiprocessing import Queue, Process
from multiprocessing.queues import Empty
import time
timeout=1
# timeout=3
def my_function(something):
time.sleep(2)
return f'my message: {something}'
def wrapper(something, queue):
message ="too late..."
try:
message = my_function(something)
return message
finally:
queue.put(message)
try:
queue = Queue()
params = ("hello", queue)
child_process = Process(target=wrapper, args=params)
child_process.start()
output = queue.get(timeout=timeout)
print(f"ok: {output}")
except Empty:
timeout_message = f"Timeout {timeout}s reached"
print(timeout_message)
finally:
if 'child_process' in locals():
child_process.kill()
您可以测试并验证根据timeout=1
或timeout=3
,我是否可以触发错误。
我的主要问题是,真正的my_function()
是一个torch
模型推理,我想将的线程数量限制在4个
如果my_function
在主进程中,可以很容易地做到这一点,但在我的示例中,我尝试了很多技巧来将其限制在子进程中,但都没有成功(使用threadpoolctl.threadpool_limits(4)
、torch.set_num_threads(4)
、os.environ["OMP_NUM_THREADS"]=4
、os.environ["MKL_NUM_THREADS"]=4
(。
我对其他解决方案完全开放,这些解决方案可以监控函数的执行时间,同时限制该函数使用的线程数量。
谢谢问候
您可以使用Pool限制同时处理。(https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool(您可以设置每个孩子完成的最大任务数。看看吧。
这里有一个来自杰森·布朗利的superfastpython的样本:
# SuperFastPython.com
# example of limiting the number of tasks per child in the process pool
from time import sleep
from multiprocessing.pool import Pool
from multiprocessing import current_process
# task executed in a worker process
def task(value):
# get the current process
process = current_process()
# report a message
print(f'Worker is {process.name} with {value}', flush=True)
# block for a moment
sleep(1)
# protect the entry point
if __name__ == '__main__':
# create and configure the process pool
with Pool(2, maxtasksperchild=3) as pool:
# issue tasks to the process pool
for i in range(10):
pool.apply_async(task, args=(i,))
# close the process pool
pool.close()
# wait for all tasks to complete
pool.join()