import shlex
from typing import List
from dargs import Argument
from dpdispatcher.dlog import dlog
from dpdispatcher.machine import Machine
from dpdispatcher.utils.job_status import JobStatus
from dpdispatcher.utils.utils import (
RetrySignal,
customized_script_header_template,
retry,
)
lsf_script_header_template = """\
#!/bin/bash -l
#BSUB -e %J.err
#BSUB -o %J.out
{lsf_nodes_line}
{lsf_ptile_line}
{lsf_partition_line}
{lsf_number_gpu_line}"""
[docs]
class LSF(Machine):
"""LSF batch."""
[docs]
def gen_script(self, job):
lsf_script = super().gen_script(job)
return lsf_script
@retry()
def do_submit(self, job):
script_file_name = job.script_file_name
script_str = self.gen_script(job)
job_id_name = job.job_hash + "_job_id"
self.context.write_file(fname=script_file_name, write_str=script_str)
script_run_str = self.gen_script_command(job)
script_run_file_name = f"{job.script_file_name}.run"
self.context.write_file(fname=script_run_file_name, write_str=script_run_str)
try:
stdin, stdout, stderr = self.context.block_checkcall(
"cd {} && {} {}".format(
shlex.quote(self.context.remote_root),
"bsub < ",
shlex.quote(script_file_name),
)
)
except RuntimeError as err:
raise RetrySignal(err) from err
subret = stdout.readlines()
job_id = subret[0].split()[1][1:-1]
self.context.write_file(job_id_name, job_id)
return job_id
# TODO: derive abstract methods
[docs]
def default_resources(self, resources):
pass
[docs]
def sub_script_cmd(self, res):
pass
[docs]
def sub_script_head(self, res):
pass
@retry()
def check_status(self, job):
try:
job_id = job.job_id
except AttributeError:
return JobStatus.terminated
if job_id == "":
return JobStatus.unsubmitted
ret, stdin, stdout, stderr = self.context.block_call("bjobs " + job_id)
err_str = stderr.read().decode("utf-8")
if (f"Job <{job_id}> is not found") in err_str:
if self.check_finish_tag(job):
return JobStatus.finished
else:
return JobStatus.terminated
elif ret != 0:
# just retry when any unknown error raised.
raise RetrySignal(
f"Get error code {ret} in checking status with job: {job.job_hash} . message: {err_str}"
)
status_out = stdout.read().decode("utf-8").split("\n")
if len(status_out) < 2:
return JobStatus.unknown
else:
status_line = status_out[1]
status_word = status_line.split()[2]
# ref: https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html
if status_word in ["PEND", "WAIT", "PSUSP"]:
return JobStatus.waiting
elif status_word in ["RUN", "USUSP"]:
return JobStatus.running
elif status_word in ["DONE", "EXIT"]:
if self.check_finish_tag(job):
dlog.info(f"job: {job.job_hash} {job.job_id} finished")
return JobStatus.finished
else:
return JobStatus.terminated
else:
return JobStatus.unknown
[docs]
def check_finish_tag(self, job):
job_tag_finished = job.job_hash + "_job_tag_finished"
return self.context.check_file_exists(job_tag_finished)
[docs]
@classmethod
def resources_subfields(cls) -> List[Argument]:
"""Generate the resources subfields.
Returns
-------
list[Argument]
resources subfields
"""
doc_custom_gpu_line = "Custom GPU configuration, starting with #BSUB"
doc_gpu_usage = "Choosing if GPU is used in the calculation step. "
doc_gpu_new_syntax = (
"For LFS >= 10.1.0.3, new option -gpu for #BSUB could be used. "
"If False, and old syntax would be used."
)
doc_gpu_exclusive = (
"Only take effect when new syntax enabled. "
"Control whether submit tasks in exclusive way for GPU."
)
return [
Argument(
"kwargs",
dict,
[
Argument(
"gpu_usage",
bool,
optional=True,
default=False,
doc=doc_gpu_usage,
),
Argument(
"gpu_new_syntax",
bool,
optional=True,
default=False,
doc=doc_gpu_new_syntax,
),
Argument(
"gpu_exclusive",
bool,
optional=True,
default=True,
doc=doc_gpu_exclusive,
),
Argument(
"custom_gpu_line",
str,
optional=True,
default=None,
doc=doc_custom_gpu_line,
),
],
optional=False,
doc="Extra arguments.",
)
]
[docs]
def kill(self, job):
"""Kill the job.
Parameters
----------
job : Job
job
"""
job_id = job.job_id
ret, stdin, stdout, stderr = self.context.block_call("bkill " + str(job_id))