我正在尝试为作业调度程序编写提交者。由于我不知道作业何时到来以及作业将运行多长时间,因此我使用多处理为每个具有子进程和分离的作业生成一个进程,以便能够处理下一个作业。同时,这工作得很好,但我想在作业完成后获取返回代码,这可能吗?我尝试了几个子流程变体,但那些返回 RC 的子流程在作业运行时阻塞了进程。
#!/usr/bin/python3
# coding=utf-8
import time
import multiprocessing
import subprocess
JobsList = []
def SubmitJob(jobname):
""" Submit the next requested job """
print(f"Starting job {jobname}...")
JobDir ="/home/xxxxx/Jobs/"
JobMem = "{}{}.sh".format(JobDir, jobname)
SysoutFile = "./Sysout/{}.out".format(jobname)
fh = open(SysoutFile, 'w')
kwargs = {}
kwargs.update(start_new_session=True)
p = subprocess.Popen(JobMem, shell = False, stdout = fh, **kwargs)
pid = p.pid
print(f"Job {jobname} pid {pid} submitted...")
def PrepareSubmit():
""" Create and start one process per job """
jobs = []
for Job in JobsList:
process = multiprocessing.Process(target=SubmitJob,
args=(Job,))
jobs.append(process)
JobsList.remove(Job)
for j in jobs:
j.start()
for j in jobs:
j.join()
print("All jobs submitted...")
def main():
""" Check queue for new job requests """
number_of_lines = 0
jobs_list = []
while 1:
job_queue = open("/home/xxxxx/Development/Python/#Projects/Scheduler/jobs.que", 'r')
lines = job_queue.readlines()
if len(lines) > number_of_lines:
jobs_list.append(lines[len(lines)-1])
NewJob = lines[len(lines)-1][:-1]
JobsList.append(NewJob)
PrepareSubmit()
number_of_lines = number_of_lines+1
time.sleep(1)
if __name__ == "__main__":
main()
main(( 中的 while 循环仅用于测试目的。
有人可以告诉我这是否可能以及如何?提前谢谢。
这是为我提供返回代码的代码,但在上一个作业完成之前不会发送作业。因此,如果我有一个长时间运行的作业,它会延迟运行作业的过程,我称之为阻塞。
def Submit(job):
""" Submit the next requested job """
print(f"Starting job {job}...")
JobDir ="/home/uwe/Jobs/"
JobMem = "{}{}.sh".format(JobDir, job)
SysoutFile = "./Sysout/{}.out".format(job)
fh = open(SysoutFile, 'w')
kwargs = {}
kwargs.update(start_new_session=True)
p = subprocess.Popen(JobMem, shell = False, stdout = fh, **kwargs)
pid = p.pid
while p.poll() == None:
a = p.poll()
print(a)
time.sleep(1)
else:
rc = p.returncode
print(f"PID: {pid} rc: {rc}")
def main():
JobsList = ['JOB90501','JOB00001','JOB00002','JOB00003']
for Job in JobsList:
Submit(Job)
罗伊,这是你上次提示后我当前的代码:
def SubmitJob(jobname):
""" Submit the next requested job """
JobDir ="/home/uwe/Jobs/"
JobMem = "{}{}.sh".format(JobDir, jobname)
SysoutFile = "./Sysout/{}.out".format(jobname)
fh = open(SysoutFile, 'w')
kwargs = {}
kwargs.update(start_new_session=True)
p = subprocess.Popen(JobMem, shell = False, stdout = fh, **kwargs)
ProcessList[p] = p.pid
print(f"Started job {jobname} - PID: {p.pid}")
def main():
c_JobsList = ['JOB00001','JOB00002','JOB00003']
for Job in c_JobsList:
SubmitJob(Job)
for p, pid in ProcessList.items():
RcFile = "./Sysout/{}.rc".format(pid)
f = open(RcFile, 'w')
while p.poll() == None:
a = p.poll()
time.sleep(1)
else:
rc = p.returncode
f.writelines(str(rc))
print(f"PID: {pid} rc: {rc}")
f.close()
和输出:
Started job JOB00001 - PID: 5426
Started job JOB00002 - PID: 5427
Started job JOB00003 - PID: 5429
PID: 5426 rc: 0
PID: 5427 rc: 0
PID: 5429 rc: 8
编辑(下面的原始答案供将来参考(
自然的意思是用于这个目的是Popen.poll
,但显然它在某些情况下不起作用(见 https://lists.gt.net/python/bugs/633489(。我想建议的解决方案是使用超时非常短的Popen.wait
,如以下代码示例所示:
import subprocess
import time
p = subprocess.Popen(["/bin/sleep", "3"])
print(f"Created process {p.pid}")
count = 0
while True:
try:
ret = p.wait(.001) # wait for 1 ms
print(f"Got a return code {ret}")
break
except subprocess.TimeoutExpired as e:
print("..", end = "")
time.sleep(.5)
print(f"Still waiting, count is {count}")
count += 1
print ("Done!")
我得到的输出是:
Created process 30040
..Still waiting, count is 0
..Still waiting, count is 1
..Still waiting, count is 2
..Still waiting, count is 3
..Still waiting, count is 4
..Still waiting, count is 5
Got a return code 0
Done
原创想法 - Popen.poll
您应该使用的方法是Popen.poll
(文档(。它返回进程的退出状态,如果进程仍在运行,则返回None
。
要使用它,您必须保留调用subprocess.Popen
时获得的"popen"对象,并在稍后的时间poll
这些对象。