Source code for dpdispatcher.lsf

import time

from dpdispatcher.machine import Machine
from dpdispatcher import dlog
from dpdispatcher.JobStatus import JobStatus
from typing import List
from dargs import Argument
from dpdispatcher.utils import retry, RetrySignal


lsf_script_header_template = """\
#!/bin/bash -l
#BSUB -e %J.err
#BSUB -o %J.out
{lsf_nodes_line}
{lsf_ptile_line}
{lsf_partition_line}
{lsf_number_gpu_line}
"""


[docs]class LSF(Machine): """ LSF batch """
[docs] def gen_script(self, job): lsf_script = super(LSF, self).gen_script(job) return lsf_script
[docs] def gen_script_header(self, job): resources = job.resources script_header_dict = { 'lsf_nodes_line': "#BSUB -n {number_cores}".format( number_cores=resources.number_node * resources.cpu_per_node), 'lsf_ptile_line': "#BSUB -R 'span[ptile={cpu_per_node}]'".format( cpu_per_node=resources.cpu_per_node), 'lsf_partition_line': "#BSUB -q {queue_name}".format( queue_name=resources.queue_name) } gpu_usage_flag = resources.kwargs.get('gpu_usage', False) gpu_new_syntax_flag = resources.kwargs.get('gpu_new_syntax', False) gpu_exclusive_flag = resources.kwargs.get('gpu_exclusive', True) custom_gpu_line = resources.kwargs.get("custom_gpu_line", None) if not custom_gpu_line: if gpu_usage_flag is True: if gpu_new_syntax_flag is True: if gpu_exclusive_flag is True: script_header_dict['lsf_number_gpu_line'] = "#BSUB -gpu 'num={gpu_per_node}:mode=shared:" \ "j_exclusive=yes'".format( gpu_per_node=resources.gpu_per_node) else: script_header_dict['lsf_number_gpu_line'] = "#BSUB -gpu 'num={gpu_per_node}:mode=shared:" \ "j_exclusive=no'".format( gpu_per_node=resources.gpu_per_node) else: script_header_dict['lsf_number_gpu_line'] = '#BSUB -R "select[ngpus >0] rusage[' \ 'ngpus_excl_p={gpu_per_node}]"'.format( gpu_per_node=resources.gpu_per_node) else: script_header_dict['lsf_number_gpu_line'] = "" else: script_header_dict['lsf_number_gpu_line'] = custom_gpu_line lsf_script_header = lsf_script_header_template.format(**script_header_dict) return lsf_script_header
@retry() def do_submit(self, job): script_file_name = job.script_file_name script_str = self.gen_script(job) job_id_name = job.job_hash + '_job_id' self.context.write_file(fname=script_file_name, write_str=script_str) try: stdin, stdout, stderr = self.context.block_checkcall( 'cd %s && %s %s' % (self.context.remote_root, 'bsub < ', script_file_name) ) except RuntimeError as err: raise RetrySignal(err) from err subret = (stdout.readlines()) job_id = subret[0].split()[1][1:-1] self.context.write_file(job_id_name, job_id) return job_id # TODO: derive abstract methods
[docs] def default_resources(self, resources): pass
[docs] def sub_script_cmd(self, res): pass
[docs] def sub_script_head(self, res): pass
@retry() def check_status(self, job): try: job_id = job.job_id except AttributeError: return JobStatus.terminated if job_id == "": return JobStatus.unsubmitted ret, stdin, stdout, stderr \ = self.context.block_call("bjobs " + job_id) err_str = stderr.read().decode('utf-8') if ("Job <%s> is not found" % job_id) in err_str: if self.check_finish_tag(job): return JobStatus.finished else: return JobStatus.terminated elif ret != 0: # just retry when any unknown error raised. raise RetrySignal("Get error code %d in checking status through ssh with job: %s . message: %s" % (ret, job.job_hash, err_str)) status_out = stdout.read().decode('utf-8').split('\n') if len(status_out) < 2: return JobStatus.unknown else: status_line = status_out[1] status_word = status_line.split()[2] # ref: https://www.ibm.com/support/knowledgecenter/en/SSETD4_9.1.2/lsf_command_ref/bjobs.1.html if status_word in ["PEND", "WAIT", "PSUSP"]: return JobStatus.waiting elif status_word in ["RUN", "USUSP"]: return JobStatus.running elif status_word in ["DONE", "EXIT"]: if self.check_finish_tag(job): dlog.info(f"job: {job.job_hash} {job.job_id} finished") return JobStatus.finished else: return JobStatus.terminated else: return JobStatus.unknown
[docs] def check_finish_tag(self, job): job_tag_finished = job.job_hash + '_job_tag_finished' return self.context.check_file_exists(job_tag_finished)
[docs] @classmethod def resources_subfields(cls) -> List[Argument]: """Generate the resources subfields. Returns ------- list[Argument] resources subfields """ doc_custom_gpu_line = "Custom GPU configuration, starting with #BSUB" doc_gpu_usage = "Choosing if GPU is used in the calculation step. " doc_gpu_new_syntax = "For LFS >= 10.1.0.3, new option -gpu for #BSUB could be used. " \ "If False, and old syntax would be used." doc_gpu_exclusive = "Only take effect when new syntax enabled. " \ "Control whether submit tasks in exclusive way for GPU." return [Argument("kwargs", dict, [ Argument("gpu_usage", bool, optional=True, default=False, doc=doc_gpu_usage), Argument("gpu_new_syntax", bool, optional=True, default=False, doc=doc_gpu_new_syntax), Argument("gpu_exclusive", bool, optional=True, default=True, doc=doc_gpu_exclusive), Argument("custom_gpu_line", str, optional=True, default=None, doc=doc_custom_gpu_line) ], optional=False, doc="Extra arguments.")]