Source code for dpdispatcher.machines.lsf

import shlex
from typing import List

from dargs import Argument

from dpdispatcher.dlog import dlog
from dpdispatcher.machine import Machine
from dpdispatcher.utils.job_status import JobStatus
from dpdispatcher.utils.utils import (
    RetrySignal,
    customized_script_header_template,
    retry,
)

lsf_script_header_template = """\
#!/bin/bash -l
#BSUB -e %J.err
#BSUB -o %J.out
{lsf_nodes_line}
{lsf_ptile_line}
{lsf_partition_line}
{lsf_number_gpu_line}"""


[docs] class LSF(Machine): """LSF batch."""
[docs] def gen_script(self, job): lsf_script = super().gen_script(job) return lsf_script
[docs] def gen_script_header(self, job): resources = job.resources script_header_dict = { "lsf_nodes_line": f"#BSUB -n {resources.number_node * resources.cpu_per_node}", "lsf_ptile_line": f"#BSUB -R 'span[ptile={resources.cpu_per_node}]'", "lsf_partition_line": f"#BSUB -q {resources.queue_name}", } gpu_usage_flag = resources.kwargs.get("gpu_usage", False) gpu_new_syntax_flag = resources.kwargs.get("gpu_new_syntax", False) gpu_exclusive_flag = resources.kwargs.get("gpu_exclusive", True) custom_gpu_line = resources.kwargs.get("custom_gpu_line", None) if not custom_gpu_line: if gpu_usage_flag is True: if gpu_new_syntax_flag is True: if gpu_exclusive_flag is True: script_header_dict["lsf_number_gpu_line"] = ( f"#BSUB -gpu 'num={resources.gpu_per_node}:mode=shared:" "j_exclusive=yes'" ) else: script_header_dict["lsf_number_gpu_line"] = ( f"#BSUB -gpu 'num={resources.gpu_per_node}:mode=shared:" "j_exclusive=no'" ) else: script_header_dict["lsf_number_gpu_line"] = ( '#BSUB -R "select[ngpus >0] rusage[' f'ngpus_excl_p={resources.gpu_per_node}]"' ) else: script_header_dict["lsf_number_gpu_line"] = "" else: script_header_dict["lsf_number_gpu_line"] = custom_gpu_line if ( resources["strategy"].get("customized_script_header_template_file") is not None ): lsf_script_header = customized_script_header_template( resources["strategy"]["customized_script_header_template_file"], resources, ) else: lsf_script_header = lsf_script_header_template.format(**script_header_dict) return lsf_script_header
@retry() def do_submit(self, job): script_file_name = job.script_file_name script_str = self.gen_script(job) job_id_name = job.job_hash + "_job_id" self.context.write_file(fname=script_file_name, write_str=script_str) script_run_str = self.gen_script_command(job) script_run_file_name = f"{job.script_file_name}.run" self.context.write_file(fname=script_run_file_name, write_str=script_run_str) try: stdin, stdout, stderr = self.context.block_checkcall( "cd {} && {} {}".format( shlex.quote(self.context.remote_root), "bsub < ", shlex.quote(script_file_name), ) ) except RuntimeError as err: raise RetrySignal(err) from err subret = stdout.readlines() job_id = subret[0].split()[1][1:-1] self.context.write_file(job_id_name, job_id) return job_id # TODO: derive abstract methods
[docs] def default_resources(self, resources): pass
[docs] def sub_script_cmd(self, res): pass
[docs] def sub_script_head(self, res): pass
@retry() def check_status(self, job): try: job_id = job.job_id except AttributeError: return JobStatus.terminated if job_id == "": return JobStatus.unsubmitted ret, stdin, stdout, stderr = self.context.block_call("bjobs " + job_id) err_str = stderr.read().decode("utf-8") if (f"Job <{job_id}> is not found") in err_str: if self.check_finish_tag(job): return JobStatus.finished else: return JobStatus.terminated elif ret != 0: # just retry when any unknown error raised. raise RetrySignal( f"Get error code {ret} in checking status with job: {job.job_hash} . message: {err_str}" ) status_out = stdout.read().decode("utf-8").split("\n") if len(status_out) < 2: return JobStatus.unknown else: status_line = status_out[1] status_word = status_line.split()[2] # ref: https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html if status_word in ["PEND", "WAIT", "PSUSP"]: return JobStatus.waiting elif status_word in ["RUN", "USUSP"]: return JobStatus.running elif status_word in ["DONE", "EXIT"]: if self.check_finish_tag(job): dlog.info(f"job: {job.job_hash} {job.job_id} finished") return JobStatus.finished else: return JobStatus.terminated else: return JobStatus.unknown
[docs] def check_finish_tag(self, job): job_tag_finished = job.job_hash + "_job_tag_finished" return self.context.check_file_exists(job_tag_finished)
[docs] @classmethod def resources_subfields(cls) -> List[Argument]: """Generate the resources subfields. Returns ------- list[Argument] resources subfields """ doc_custom_gpu_line = "Custom GPU configuration, starting with #BSUB" doc_gpu_usage = "Choosing if GPU is used in the calculation step. " doc_gpu_new_syntax = ( "For LFS >= 10.1.0.3, new option -gpu for #BSUB could be used. " "If False, and old syntax would be used." ) doc_gpu_exclusive = ( "Only take effect when new syntax enabled. " "Control whether submit tasks in exclusive way for GPU." ) return [ Argument( "kwargs", dict, [ Argument( "gpu_usage", bool, optional=True, default=False, doc=doc_gpu_usage, ), Argument( "gpu_new_syntax", bool, optional=True, default=False, doc=doc_gpu_new_syntax, ), Argument( "gpu_exclusive", bool, optional=True, default=True, doc=doc_gpu_exclusive, ), Argument( "custom_gpu_line", str, optional=True, default=None, doc=doc_custom_gpu_line, ), ], optional=False, doc="Extra arguments.", ) ]
[docs] def kill(self, job): """Kill the job. Parameters ---------- job : Job job """ job_id = job.job_id ret, stdin, stdout, stderr = self.context.block_call("bkill " + str(job_id))