from dpdispatcher.JobStatus import JobStatus
from dpdispatcher import dlog
from dpdispatcher.machine import Machine
pbs_script_header_template="""
#!/bin/bash -l
{select_node_line}
#PBS -j oe
{queue_name_line}
"""
[docs]class PBS(Machine):
[docs] def gen_script(self, job):
pbs_script = super(PBS, self).gen_script(job)
return pbs_script
[docs] def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
job_id_name = job.job_hash + '_job_id'
# script_str = self.sub_script(job_dirs, cmd, args=args, resources=resources, outlog=outlog, errlog=errlog)
self.context.write_file(fname=script_file_name, write_str=script_str)
# self.context.write_file(fname=os.path.join(self.context.submission.work_base, script_file_name), write_str=script_str)
# script_file_dir = os.path.join(self.context.submission.work_base)
script_file_dir = self.context.remote_root
# stdin, stdout, stderr = self.context.block_checkcall('cd %s && %s %s' % (self.context.remote_root, 'qsub', script_file_name))
stdin, stdout, stderr = self.context.block_checkcall('cd %s && %s %s' % (script_file_dir, 'qsub', script_file_name))
subret = (stdout.readlines())
job_id = subret[0].split()[0]
self.context.write_file(job_id_name, job_id)
return job_id
[docs] def default_resources(self, resources) :
pass
[docs] def check_status(self, job):
job_id = job.job_id
if job_id == "" :
return JobStatus.unsubmitted
ret, stdin, stdout, stderr\
= self.context.block_call ("qstat -x " + job_id)
err_str = stderr.read().decode('utf-8')
if (ret != 0) :
if str("qstat: Unknown Job Id") in err_str or str("Job has finished") in err_str:
if self.check_finish_tag(job=job) :
return JobStatus.finished
else :
return JobStatus.terminated
else :
raise RuntimeError ("status command qstat fails to execute. erro info: %s return code %d"
% (err_str, ret))
status_line = stdout.read().decode('utf-8').split ('\n')[-2]
status_word = status_line.split ()[-2]
# dlog.info (status_word)
if status_word in ["Q","H"] :
return JobStatus.waiting
elif status_word in ["R"] :
return JobStatus.running
elif status_word in ["C", "E", "K", "F"] :
if self.check_finish_tag(job):
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
return JobStatus.finished
else :
return JobStatus.terminated
else :
return JobStatus.unknown
[docs] def check_finish_tag(self, job):
job_tag_finished = job.job_hash + '_job_tag_finished'
return self.context.check_file_exists(job_tag_finished)
[docs]class Torque(PBS):
[docs] def check_status(self, job):
job_id = job.job_id
if job_id == "" :
return JobStatus.unsubmitted
ret, stdin, stdout, stderr\
= self.context.block_call ("qstat -l " + job_id)
err_str = stderr.read().decode('utf-8')
if (ret != 0) :
if str("qstat: Unknown Job Id") in err_str or str("Job has finished") in err_str:
if self.check_finish_tag(job=job) :
return JobStatus.finished
else :
return JobStatus.terminated
else :
raise RuntimeError ("status command qstat fails to execute. erro info: %s return code %d"
% (err_str, ret))
status_line = stdout.read().decode('utf-8').split ('\n')[-2]
status_word = status_line.split ()[-2]
# dlog.info (status_word)
if status_word in ["Q","H"] :
return JobStatus.waiting
elif status_word in ["R"] :
return JobStatus.running
elif status_word in ["C", "E", "K", "F"] :
if self.check_finish_tag(job):
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
return JobStatus.finished
else :
return JobStatus.terminated
else :
return JobStatus.unknown