如何在 python 中通过添加内核后立即添加进程来执行批量计算?



Bash有函数"wait -n",它可以以相对简单的方式用于停止子进程的后续执行,直到一定数量的处理器内核可用。 例如,我可以执行以下操作,

for IJOB in IJOBRANGE;
do
./func.x ${IJOB}
# checking the number of background processes
# and halting the execution accordingly
bground=( $(jobs -p) );
if (( ${#bground[@]} >= CORES )); then
wait -n
fi
done || exit 1

此代码段可以使用不同的参数批量执行任意 C 进程"func.x",并始终维护子进程的固定数量的并行实例,设置为值"CORES"。

我想知道是否可以使用 python 脚本完成类似的事情,并且 Python 子进程(或函数)。目前,我定义了一个 python 函数,设置了一个一维参数数组,并使用 python 多处理模块中的 Pool 例程在参数数组上并行计算函数。池函数对我的函数执行一定数量的评估(以下示例中的 CPU CORES # 个),并等到生成进程的所有实例都结束,然后再移动到下一批。

import multiprocessing as mp
def func(x):
# some computation with x
def main(j):
# setting the parameter array
xarray = range(j)
pool = mp.Pool()
pool.map(func,xarray)

我想知道是否可以修改此代码片段,以便始终对我的子例程执行固定数量的并行计算,即在其中一个子进程完成后立即添加另一个进程。这里所有的"func"进程都应该是独立的,执行顺序也无关紧要。我是 python 方式的新手,如果有一些有用的观点,那就太好了。

在评论中的讨论之后,这里有一些改编自您的测试代码,表明Pool在将新任务分配给可用工作线程之前不会等待所有并行任务完成:

import multiprocessing as mp
from time import sleep, time

def func(x):
"""sleeps for x seconds"""
name = mp.current_process().name
print("{} {}: sleep {}".format(time(), name, x))
sleep(x)
print("{} {}: done sleeping".format(time(), name))

def main():
# A pool of two processes, for the sake of simplicity
pool = mp.Pool(processes=2)
# Here's how that works out visually:
#
#    0s        1s       2s        3s
# P1 [sleep(1)][     sleep(2)     ]
# P2 [     sleep(2)     ][sleep(1)]
sleeps = [1, 2, 2, 1]
pool.map(func, sleeps)

if __name__ == "__main__":
main()

运行此代码会给出(为清楚起见,简化了时间戳):

$ python3 mp.py 
0s: ForkPoolWorker-1: sleep 1
0s: ForkPoolWorker-2: sleep 2
1s: ForkPoolWorker-1: done sleeping
1s: ForkPoolWorker-1: sleep 2
2s: ForkPoolWorker-2: done sleeping
2s: ForkPoolWorker-2: sleep 1
3s: ForkPoolWorker-1: done sleeping
3s: ForkPoolWorker-2: done sleeping

我们可以看到,第一个进程在开始第二个任务之前不会等待第二个进程完成其第一个任务。

所以我想这应该回答你提出的问题,希望我已经清楚地理解了你。

相关内容

  • 没有找到相关文章

最新更新