Source code for dpdispatcher.machines.fugaku
import shlex
from dpdispatcher.dlog import dlog
from dpdispatcher.machine import Machine
from dpdispatcher.utils.job_status import JobStatus
from dpdispatcher.utils.utils import customized_script_header_template
fugaku_script_header_template = """\
{queue_name_line}
{fugaku_node_number_line}
{fugaku_ntasks_per_node_line}
"""
[docs]
class Fugaku(Machine):
[docs]
def gen_script(self, job):
fugaku_script = super().gen_script(job)
return fugaku_script
[docs]
def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
job_id_name = job.job_hash + "_job_id"
# script_str = self.sub_script(job_dirs, cmd, args=args, resources=resources, outlog=outlog, errlog=errlog)
self.context.write_file(fname=script_file_name, write_str=script_str)
script_run_str = self.gen_script_command(job)
script_run_file_name = f"{job.script_file_name}.run"
self.context.write_file(fname=script_run_file_name, write_str=script_run_str)
# self.context.write_file(fname=os.path.join(self.context.submission.work_base, script_file_name), write_str=script_str)
# script_file_dir = os.path.join(self.context.submission.work_base)
script_file_dir = self.context.remote_root
# stdin, stdout, stderr = self.context.block_checkcall('cd %s && %s %s' % (self.context.remote_root, 'pjsub', script_file_name))
stdin, stdout, stderr = self.context.block_checkcall(
"cd {} && {} {}".format(
shlex.quote(script_file_dir), "pjsub", shlex.quote(script_file_name)
)
)
subret = stdout.readlines()
job_id = subret[0].split()[5]
self.context.write_file(job_id_name, job_id)
return job_id
[docs]
def default_resources(self, resources):
pass
[docs]
def check_status(self, job):
job_id = job.job_id
if job_id == "":
return JobStatus.unsubmitted
ret, stdin, stdout, stderr = self.context.block_call("pjstat " + job_id)
err_str = stderr.read().decode("utf-8")
try:
status_line = stdout.read().decode("utf-8").split("\n")[-2]
# pjstat only retrun 0 if the job is not waiting or running
except Exception:
ret, stdin, stdout, stderr = self.context.block_call("pjstat -H " + job_id)
status_line = stdout.read().decode("utf-8").split("\n")[-2]
status_word = status_line.split()[3]
if status_word in ["EXT", "CCL", "ERR"]:
if self.check_finish_tag(job):
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
return JobStatus.finished
else:
return JobStatus.terminated
else:
return JobStatus.unknown
status_word = status_line.split()[3]
# dlog.info (status_word)
if status_word in ["QUE", "HLD", "RNA", "SPD"]:
return JobStatus.waiting
elif status_word in ["RUN", "RNE"]:
return JobStatus.running
else:
return JobStatus.unknown
[docs]
def check_finish_tag(self, job):
job_tag_finished = job.job_hash + "_job_tag_finished"
return self.context.check_file_exists(job_tag_finished)