Source code for dpdispatcher.lazy_local_context

from dpdispatcher.base_context import BaseContext
import os
import subprocess as sp

[docs]class SPRetObj(object) : def __init__ (self, ret) : self.data = ret
[docs] def read(self) : return self.data
[docs] def readlines(self) : lines = self.data.decode('utf-8').splitlines() ret = [] for aa in lines: ret.append(aa+'\n') return ret
[docs]class LazyLocalContext(BaseContext) : def __init__ (self, local_root, remote_root=None, remote_profile={}, *args, **kwargs, ): """ local_root: remote_root: remote_profile: """ assert(type(local_root) == str) self.init_local_root = local_root self.init_remote_root = remote_root self.temp_local_root = os.path.abspath(local_root) self.temp_remote_root = os.path.abspath(local_root) self.remote_profile = remote_profile # self.job_uuid = None # self.submission = None # if job_uuid: # self.job_uuid=job_uuid # else: # self.job_uuid = str(uuid.uuid4())
[docs] @classmethod def load_from_dict(cls, context_dict): local_root = context_dict['local_root'] remote_root = context_dict.get('remote_root', None) remote_profile = context_dict.get('remote_profile', {}) instance = cls( local_root=local_root, remote_root=remote_root, remote_profile=remote_profile ) return instance
[docs] def bind_submission(self, submission): self.submission = submission self.local_root = os.path.join(self.temp_local_root, submission.work_base) self.remote_root = os.path.join(self.temp_local_root, submission.work_base)
# dlog.debug("debug:LazyLocalContext.bind_submission;" # "submission.submission_hash:{submission.submission_hash};" # "self.local_root:{self.local_root};" # "self.remote_root:{self.remote_root}")
[docs] def get_job_root(self) : return self.local_root
[docs] def upload(self, jobs, # local_up_files, dereference = True) : pass
[docs] def download(self, jobs, # remote_down_files, check_exists = False, mark_failure = True, back_error=False) : pass
# for ii in job_dirs : # for jj in remote_down_files : # fname = os.path.join(self.local_root, ii, jj) # exists = os.path.exists(fname) # if not exists: # if check_exists: # if mark_failure: # with open(os.path.join(self.local_root, ii, 'tag_failure_download_%s' % jj), 'w') as fp: pass # else: # pass # else: # raise RuntimeError('do not find download file ' + fname)
[docs] def block_checkcall(self, cmd) : cwd = os.getcwd() # script_dir = os.path.join(self.local_root, self.submission.work_base) os.chdir(self.local_root) # os.chdir(script_dir) proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE) o, e = proc.communicate() stdout = SPRetObj(o) stderr = SPRetObj(e) code = proc.returncode if code != 0: os.chdir(cwd) raise RuntimeError("Get error code %d in locally calling %s with job: %s ", (code, cmd, self.submission.submission_hash)) os.chdir(cwd) return None, stdout, stderr
[docs] def block_call(self, cmd) : cwd = os.getcwd() os.chdir(self.local_root) proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE) o, e = proc.communicate() stdout = SPRetObj(o) stderr = SPRetObj(e) code = proc.returncode os.chdir(cwd) return code, None, stdout, stderr
[docs] def clean(self) : pass
[docs] def write_file(self, fname, write_str): os.makedirs(self.remote_root, exist_ok = True) with open(os.path.join(self.remote_root, fname), 'w') as fp : fp.write(write_str)
[docs] def read_file(self, fname): with open(os.path.join(self.remote_root, fname), 'r') as fp: ret = fp.read() return ret
[docs] def check_file_exists(self, fname): # submission_work_base = os.path.join(self.local_root, self.submission.work_base) # file_to_be_checked = os.path.join(submission_work_base, fname) # print('debug:dpdispatcher.LazyLocalContext().check_file_exists:file_to_be_checked', file_to_be_checked) # return os.path.isfile(file_to_be_checked) return os.path.isfile(os.path.join(self.remote_root, fname))
[docs] def call(self, cmd) : cwd = os.getcwd() os.chdir(self.local_root) proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE) os.chdir(cwd) return proc
[docs] def kill(self, proc): proc.kill()
[docs] def check_finish(self, proc): return (proc.poll() != None)
[docs] def get_return(self, proc): ret = proc.poll() if ret is None: return None, None, None else : try: o, e = proc.communicate() stdout = SPRetObj(o) stderr = SPRetObj(e) except sp.SubprocessError: stdout = None stderr = None return ret, stdout, stderr