如何在不关闭与核心的连接的情况下终止正在运行的作业?(当前使用execnet)



我有一个计算机集群,它使用主节点与集群中的从节点进行通信。

我面临的主要问题是使用 execnet 能够杀死某些正在运行的作业,然后让新作业在另一个作业刚刚终止的同一核心上重新排队(因为我想在任何给定时间利用从属节点的所有核心)。

到目前为止,没有办法使用 execnet 终止正在运行的作业,所以我想如果我可以通过 bash 脚本手动杀死作业,比如sudo kill 12345其中 12345 是作业的 PID(获取每个作业的 PID 是 execnet 不支持的另一件事,但这是另一个主题), 然后,它将终止作业,然后在刚刚终止的同一内核上重新排队另一个作业。 它确实会正确终止作业,但是它会关闭与该通道(核心;主节点单独与每个核心通信)的连接,然后不再使用该核心,直到所有作业都完成。有没有办法在不终止与核心的连接的情况下终止正在运行的作业?

这是提交作业的脚本

import execnet, os, sys
import re
import socket
import numpy as np
import pickle, cPickle
from copy import deepcopy
import time
import job

def main():
print 'execnet source files are located at:n  {}/n'.format(
os.path.join(os.path.dirname(execnet.__file__))
)
# Generate a group of gateways.
work_dir = '/home/mpiuser/pn2/'
f = 'cluster_core_info.txt'
n_start, n_end = 250000, 250008
ci = get_cluster_info(f)
group, g_labels = make_gateway_group(ci, work_dir)

mch = group.remote_exec(job)
args = range(n_start, n_end+1)  # List of parameters to compute factorial.
manage_jobs(group, mch, queue, g_labels, args)
# Close the group of gateways.
group.terminate()
def get_cluster_info(f):
nodes, ncores = [], []
with open(f, 'r') as fid:
while True:
line = fid.readline()
if not line:
fid.close()
break
line = line.strip('n').split()
nodes.append(line[0])
ncores.append(int(line[1]))
return dict( zip(nodes, ncores) )
def make_gateway_group(cluster_info, work_dir):
''' Generate gateways on all cores in remote nodes. '''
print 'Gateways generated:n'
group = execnet.Group()
g_labels = []
nodes = list(cluster_info.keys())
for node in nodes:
for i in range(cluster_info[node]):
group.makegateway(
"ssh={0}//id={0}_{1}//chdir={2}".format(
node, i, work_dir
))
sys.stdout.write('  ')
sys.stdout.flush()
print list(group)[-1]
# Generate a string 'node-id_core-id'.
g_labels.append('{}_{}'.format(re.findall(r'd+',node)[0], i))
print ''
return group, g_labels
def get_mch_id(g_labels, string):
ids = [x for x in re.findall(r'd+', string)]
ids =  '{}_{}'.format(*ids)
return g_labels.index(ids)
def manage_jobs(group, mch, queue, g_labels, args):
args_ref = deepcopy(args)
terminated_channels = 0
active_jobs, active_args = [], []
while True:
channel, item = queue.get()
if item == 'terminate_channel':
terminated_channels += 1
print "  Gateway closed: {}".format(channel.gateway.id)
if terminated_channels == len(mch):
print "nAll jobs done.n"
break
continue
if item != "ready":
mch_id_completed = get_mch_id(g_labels, channel.gateway.id)
depopulate_list(active_jobs, mch_id_completed, active_args)
print "  Gateway {} channel id {} returned:".format(
channel.gateway.id, mch_id_completed)
print "  {}".format(item)
if not args:
print "nNo more jobs to submit, sending termination request...n"
mch.send_each(None)
args = 'terminate_channel'
if args and 
args != 'terminate_channel':
arg = args.pop(0)
idx = args_ref.index(arg)
channel.send(arg)  # arg is copied by value to the remote side of
# channel to be executed. Maybe blocked if the
# sender queue is full.
# Get the id of current channel used to submit a job,
# this id can be used to refer mch[id] to terminate a job later.
mch_id_active = get_mch_id(g_labels, channel.gateway.id)
print "Job {}:  {}!  submitted to gateway {}, channel id {}".format(
idx, arg, channel.gateway.id, mch_id_active)
populate_list(active_jobs, mch_id_active,
active_args, arg)

def populate_list(jobs, job_active, args, arg_active):
jobs.append(job_active)
args.append(arg_active)
def depopulate_list(jobs, job_completed, args):
i = jobs.index(job_completed)
jobs.pop(i)
args.pop(i)

if __name__ == '__main__':
main()

这是我 job.py 脚本:

#!/usr/bin/env python
import os, sys
import socket
import time
import numpy as np
import pickle, cPickle
import random
import job

def hostname():
return socket.gethostname()
def working_dir():
return os.getcwd()
def listdir(path):
return os.listdir(path)
def fac(arg):
return np.math.factorial(arg)
def dump(arg):
path = working_dir() + '/out'
if not os.path.exists(path):
os.mkdir(path)
f_path = path + '/fac_{}.txt'.format(arg)
t_0 = time.time()
num = fac(arg)                                   # Main operation
t_1 = time.time()
cPickle.dump(num, open(f_path, "w"), protocol=2) # Main operation
t_2 = time.time()
duration_0 = "{:.4f}".format(t_1 - t_0)
duration_1 = "{:.4f}".format(t_2 - t_1)
#num2 = cPickle.load(open(f_path, "rb"))
return '--Calculation: {} s, dumping: {} s'.format(
duration_0, duration_1)

if __name__ == '__channelexec__':
channel.send("ready")
for arg in channel:
if arg is None:
break
elif str(arg).isdigit():
channel.send((
str(arg)+'!',
job.hostname(),
job.dump(arg)
))
else:
print 'Warnning! arg sent should be number | None'

是的,你走在正确的轨道上。使用 psutil 库来管理进程,查找它们的 pid 等。 并杀死他们。无需在任何地方参与 bash 。Python涵盖了这一切。

或者,更好的是,将脚本编程为在主人说时终止。 它通常是这样完成的。 如果需要/需要,您甚至可以让它在终止自身之前启动另一个脚本。 或者,如果它与您在另一个进程中所做的相同,只需停止当前工作并在脚本中启动一个新工作,而无需终止它。

而且,如果我可以提出一个建议。不要逐行读取文件,读取整个文件,然后使用 *.splitlines()。对于小文件,以块的形式读取它们只会折磨 IO。你也不需要 *.strip()。您也应该删除未使用的导入。

最新更新