Source code for dpdispatcher.local_context

from dpdispatcher.base_context import BaseContext
import os,shutil,hashlib,signal
import subprocess as sp
from glob import glob
from dpdispatcher import dlog
from subprocess import TimeoutExpired

# class LocalSession (object) :
#     def __init__ (self, jdata) :
#         self.work_path = os.path.abspath(jdata['work_path'])
#         assert(os.path.exists(self.work_path))

#     def get_work_root(self) :
#         return self.work_path

[docs]class SPRetObj(object) : def __init__ (self, ret) : self.data = ret
[docs] def read(self) : return self.data
[docs] def readlines(self) : lines = self.data.decode('utf-8').splitlines() ret = [] for aa in lines: ret.append(aa+'\n') return ret
def _check_file_path(fname) : dirname = os.path.dirname(fname) if dirname != "": os.makedirs(dirname, exist_ok=True) def _identical_files(fname0, fname1) : with open(fname0) as fp: code0 = hashlib.sha1(fp.read().encode('utf-8')).hexdigest() with open(fname1) as fp: code1 = hashlib.sha1(fp.read().encode('utf-8')).hexdigest() return code0 == code1
[docs]class LocalContext(BaseContext) : def __init__(self, local_root, remote_root, remote_profile={}, *args, **kwargs, ): """ local_root: remote_root: remote_profile: """ assert(type(local_root) == str) self.init_local_root = local_root self.init_remote_root = remote_root self.temp_local_root = os.path.abspath(local_root) self.temp_remote_root = os.path.abspath(remote_root) self.remote_profile = remote_profile # self.work_profile = work_profile # self.job_uuid = job_uuid # self.submission = None # if job_uuid: # self.job_uuid = job_uuid # else: # self.job_uuid = str(uuid.uuid4()) # self.remote_root = os.path.join(work_profile.get_work_root(), self.job_uuid) # dlog.debug("local_root is %s"% local_root) # os.makedirs(self.remote_root, exist_ok = True)
[docs] @classmethod def load_from_dict(cls, context_dict): local_root = context_dict['local_root'] remote_root = context_dict['remote_root'] remote_profile = context_dict.get('remote_profile', {}) instance = cls( local_root=local_root, remote_root=remote_root, remote_profile=remote_profile ) return instance
[docs] def get_job_root(self) : return self.remote_root
[docs] def bind_submission(self, submission): self.submission = submission self.local_root = os.path.join(self.temp_local_root, submission.work_base) self.remote_root = os.path.join(self.temp_remote_root, submission.submission_hash)
# print('debug:LocalContext.bind_submission', submission.submission_hash, # self.local_root, self.remote_root) # os.makedirs(self.remote_root, exist_ok = True) # self.job_uuid = submission.submission_hash # self.remote_root = os.path.join(self.work_profile.get_work_root(), self.job_uuid) # os.makedirs(self.remote_root, exist_ok = True) # print('local_context.bind_submission:self.remote_root', self.remote_root) # dlog.debug("remote_root is %s"% self.remote_root) # @property # def remote_root(self): # print('local_context.remote_root:self.submission.submission_hash', self.submission.submission_hash) # print('local_context.remote_root self.submission', self.submission) # self._remote_root = os.path.join(self.work_profile.get_work_root(), self.submission.submission_hash, self.submission.work_base) # os.makedirs(self._remote_root, exist_ok = True) # return self._remote_root # @property # def local_root(self): # # self.local_root = os.path.abspath(local_root) # self._local_root = os.path.join(, self.submission.submission_hash, self.submission.work_base) # return self._local_root
[docs] def upload(self, submission): os.makedirs(self.remote_root, exist_ok = True) # os.makedirs(self.remote_root, exist_ok = True) cwd = os.getcwd() # job_dirs = [ ii.task_work_path for ii in submission.belonging_tasks] for ii in submission.belonging_tasks: local_job = os.path.join(self.local_root, ii.task_work_path) remote_job = os.path.join(self.remote_root, ii.task_work_path) os.makedirs(remote_job, exist_ok = True) os.chdir(remote_job) file_list = [] for kk in ii.forward_files: abs_file_list = glob(os.path.join(local_job, kk)) if not abs_file_list: os.chdir(cwd) raise RuntimeError('cannot find upload file ' + os.path.join(local_job, kk)) rel_file_list = [os.path.relpath(ii, start=local_job) for ii in abs_file_list] file_list.extend(rel_file_list) for jj in file_list: if not os.path.exists(os.path.join(local_job, jj)): os.chdir(cwd) raise RuntimeError('cannot find upload file ' + os.path.join(local_job, jj)) if os.path.exists(os.path.join(remote_job, jj)) : os.remove(os.path.join(remote_job, jj)) _check_file_path(jj) os.symlink(os.path.join(local_job, jj), os.path.join(remote_job, jj)) os.chdir(cwd) local_job = self.local_root remote_job = self.remote_root # os.makedirs(remote_job, exist_ok = True) os.chdir(remote_job) file_list = [] for kk in submission.forward_common_files: abs_file_list = glob(os.path.join(local_job, kk)) if not abs_file_list: os.chdir(cwd) raise RuntimeError('cannot find upload file ' + os.path.join(local_job, kk)) rel_file_list = [os.path.relpath(ii, start=local_job) for ii in abs_file_list] file_list.extend(rel_file_list) for jj in file_list: if not os.path.exists(os.path.join(local_job, jj)): os.chdir(cwd) raise RuntimeError('cannot find upload file ' + os.path.join(local_job, jj)) if os.path.exists(os.path.join(remote_job, jj)) : os.remove(os.path.join(remote_job, jj)) _check_file_path(jj) os.symlink(os.path.join(local_job, jj), os.path.join(remote_job, jj)) os.chdir(cwd)
[docs] def upload_(self, job_dirs, local_up_files, dereference = True) : cwd = os.getcwd() for ii in job_dirs : local_job = os.path.join(self.local_root, ii) remote_job = os.path.join(self.remote_root, ii) os.makedirs(remote_job, exist_ok = True) os.chdir(remote_job) for jj in local_up_files : if not os.path.exists(os.path.join(local_job, jj)): os.chdir(cwd) raise RuntimeError('cannot find upload file ' + os.path.join(local_job, jj)) if os.path.exists(os.path.join(remote_job, jj)) : os.remove(os.path.join(remote_job, jj)) _check_file_path(jj) os.symlink(os.path.join(local_job, jj), os.path.join(remote_job, jj)) os.chdir(cwd)
[docs] def download(self, submission, check_exists = False, mark_failure = True, back_error=False) : cwd = os.getcwd() for ii in submission.belonging_tasks: # for ii in job_dirs : local_job = os.path.join(self.local_root, ii.task_work_path) remote_job = os.path.join(self.remote_root, ii.task_work_path) # flist = remote_down_files flist = ii.backward_files if back_error : os.chdir(remote_job) flist += glob('error*') os.chdir(cwd) for jj in flist : rfile = os.path.join(remote_job, jj) lfile = os.path.join(local_job, jj) if not os.path.realpath(rfile) == os.path.realpath(lfile) : if (not os.path.exists(rfile)) and (not os.path.exists(lfile)): if check_exists : if mark_failure: tag_file_path = os.path.join(self.local_root, ii.task_work_path, 'tag_failure_download_%s' % jj) with open(tag_file_path, 'w') as fp: pass else : pass else : raise RuntimeError('do not find download file ' + rfile) elif (not os.path.exists(rfile)) and (os.path.exists(lfile)) : # already downloaded pass elif (os.path.exists(rfile)) and (not os.path.exists(lfile)) : # trivial case, download happily shutil.move(rfile, lfile) elif (os.path.exists(rfile)) and (os.path.exists(lfile)) : # both exists, replace! dlog.info('find existing %s, replacing by %s' % (lfile, rfile)) if os.path.isdir(lfile): shutil.rmtree(lfile, ignore_errors=True) elif os.path.isfile(lfile) or os.path.islink(lfile): os.remove(lfile) shutil.copyfile(rfile, lfile) # shutil.move(rfile, lfile) else : raise RuntimeError('should not reach here!') else : # no nothing in the case of linked files pass os.chdir(cwd) # for ii in submission.belonging_tasks: # for ii in job_dirs : # local_job = os.path.join(self.local_root, ii.task_work_path) # remote_job = os.path.join(self.remote_root, ii.task_work_path) # flist = remote_down_files # flist = ii.backward_files local_job = self.local_root remote_job = self.remote_root flist = submission.backward_common_files if back_error : os.chdir(remote_job) flist += glob('error*') os.chdir(cwd) for jj in flist : rfile = os.path.join(remote_job, jj) lfile = os.path.join(local_job, jj) if not os.path.realpath(rfile) == os.path.realpath(lfile) : if (not os.path.exists(rfile)) and (not os.path.exists(lfile)): if check_exists : if mark_failure: with open(os.path.join(self.local_root, 'tag_failure_download_%s' % jj), 'w') as fp: pass else : pass else : raise RuntimeError('do not find download file ' + rfile) elif (not os.path.exists(rfile)) and (os.path.exists(lfile)) : # already downloaded pass elif (os.path.exists(rfile)) and (not os.path.exists(lfile)) : # trivial case, download happily shutil.move(rfile, lfile) elif (os.path.exists(rfile)) and (os.path.exists(lfile)) : dlog.info(f"both exist rfile:{rfile}; lfile:{lfile}") # both exists, replace! dlog.info('find existing %s, replacing by %s' % (lfile, rfile)) if os.path.isdir(lfile): shutil.rmtree(lfile, ignore_errors=True) elif os.path.isfile(lfile) or os.path.islink(lfile): os.remove(lfile) shutil.copyfile(rfile, lfile) # shutil.move(rfile, lfile) else : raise RuntimeError('should not reach here!') else : # no nothing in the case of linked files pass os.chdir(cwd)
[docs] def download_(self, job_dirs, remote_down_files, check_exists = False, mark_failure = True, back_error=False) : cwd = os.getcwd() for ii in job_dirs : local_job = os.path.join(self.local_root, ii) remote_job = os.path.join(self.remote_root, ii) flist = remote_down_files if back_error : os.chdir(remote_job) flist += glob('error*') os.chdir(cwd) for jj in flist : rfile = os.path.join(remote_job, jj) lfile = os.path.join(local_job, jj) if not os.path.realpath(rfile) == os.path.realpath(lfile) : if (not os.path.exists(rfile)) and (not os.path.exists(lfile)): if check_exists: if mark_failure: with open(os.path.join(self.local_root, ii, 'tag_failure_download_%s' % jj), 'w') as fp: pass else : pass else : raise RuntimeError('do not find download file ' + rfile) elif (not os.path.exists(rfile)) and (os.path.exists(lfile)) : # already downloaded pass elif (os.path.exists(rfile)) and (not os.path.exists(lfile)) : # trivial case, download happily shutil.move(rfile, lfile) elif (os.path.exists(rfile)) and (os.path.exists(lfile)) : # both exists, replace! dlog.info('find existing %s, replacing by %s' % (lfile, rfile)) if os.path.isdir(lfile): shutil.rmtree(lfile, ignore_errors=True) elif os.path.isfile(lfile) or os.path.islink(lfile): os.remove(lfile) shutil.move(rfile, lfile) else : raise RuntimeError('should not reach here!') else : # no nothing in the case of linked files pass os.chdir(cwd)
[docs] def block_checkcall(self, cmd) : cwd = os.getcwd() os.chdir(self.remote_root) proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE) o, e = proc.communicate() stdout = SPRetObj(o) stderr = SPRetObj(e) code = proc.returncode if code != 0: os.chdir(cwd) raise RuntimeError("Get error code %d in locally calling %s with job: %s ", (code, cmd, self.submission.submission_hash)) os.chdir(cwd) return None, stdout, stderr
[docs] def block_call(self, cmd) : cwd = os.getcwd() os.chdir(self.remote_root) proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE) o, e = proc.communicate() stdout = SPRetObj(o) stderr = SPRetObj(e) code = proc.returncode os.chdir(cwd) return code, None, stdout, stderr
[docs] def clean(self): shutil.rmtree(self.remote_root, ignore_errors=True)
# def _clean(self) : # shutil.rmtree(self.remote_root, ignore_errors=True)
[docs] def write_file(self, fname, write_str): os.makedirs(self.remote_root, exist_ok = True) with open(os.path.join(self.remote_root, fname), 'w') as fp : fp.write(write_str)
[docs] def read_file(self, fname): with open(os.path.join(self.remote_root, fname), 'r') as fp: ret = fp.read() return ret
[docs] def check_file_exists(self, fname): return os.path.isfile(os.path.join(self.remote_root, fname))
[docs] def call(self, cmd) : cwd = os.getcwd() os.chdir(self.remote_root) proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE) os.chdir(cwd) return proc
[docs] def kill(self, job_id): os.kill(job_id, signal.SIGTERM)
[docs] def check_finish(self, proc): return (proc.poll() != None)
[docs] def get_return(self, proc): ret = proc.poll() if ret is None: return None, None, None else: try: o, e = proc.communicate() stdout = SPRetObj(o) stderr = SPRetObj(e) except TimeoutExpired: stdout = None stderr = None return ret, stdout, stderr