from dpdispatcher.base_context import BaseContext
import os
import subprocess as sp
[docs]class SPRetObj(object) :
def __init__ (self,
ret) :
self.data = ret
[docs] def read(self) :
return self.data
[docs] def readlines(self) :
lines = self.data.decode('utf-8').splitlines()
ret = []
for aa in lines:
ret.append(aa+'\n')
return ret
[docs]class LazyLocalContext(BaseContext) :
def __init__ (self,
local_root,
remote_root=None,
remote_profile={},
*args,
**kwargs,
):
"""
local_root:
remote_root:
remote_profile:
"""
assert(type(local_root) == str)
self.init_local_root = local_root
self.init_remote_root = remote_root
self.temp_local_root = os.path.abspath(local_root)
self.temp_remote_root = os.path.abspath(local_root)
self.remote_profile = remote_profile
# self.job_uuid = None
# self.submission = None
# if job_uuid:
# self.job_uuid=job_uuid
# else:
# self.job_uuid = str(uuid.uuid4())
[docs] @classmethod
def load_from_dict(cls, context_dict):
local_root = context_dict['local_root']
remote_root = context_dict.get('remote_root', None)
remote_profile = context_dict.get('remote_profile', {})
instance = cls(
local_root=local_root,
remote_root=remote_root,
remote_profile=remote_profile
)
return instance
[docs] def bind_submission(self, submission):
self.submission = submission
self.local_root = os.path.join(self.temp_local_root, submission.work_base)
self.remote_root = os.path.join(self.temp_local_root, submission.work_base)
# dlog.debug("debug:LazyLocalContext.bind_submission;"
# "submission.submission_hash:{submission.submission_hash};"
# "self.local_root:{self.local_root};"
# "self.remote_root:{self.remote_root}")
[docs] def get_job_root(self) :
return self.local_root
[docs] def upload(self,
jobs,
# local_up_files,
dereference = True) :
pass
[docs] def download(self,
jobs,
# remote_down_files,
check_exists = False,
mark_failure = True,
back_error=False) :
pass
# for ii in job_dirs :
# for jj in remote_down_files :
# fname = os.path.join(self.local_root, ii, jj)
# exists = os.path.exists(fname)
# if not exists:
# if check_exists:
# if mark_failure:
# with open(os.path.join(self.local_root, ii, 'tag_failure_download_%s' % jj), 'w') as fp: pass
# else:
# pass
# else:
# raise RuntimeError('do not find download file ' + fname)
[docs] def block_checkcall(self,
cmd) :
cwd = os.getcwd()
# script_dir = os.path.join(self.local_root, self.submission.work_base)
os.chdir(self.local_root)
# os.chdir(script_dir)
proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE)
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
code = proc.returncode
if code != 0:
os.chdir(cwd)
raise RuntimeError("Get error code %d in locally calling %s with job: %s ", (code, cmd, self.submission.submission_hash))
os.chdir(cwd)
return None, stdout, stderr
[docs] def block_call(self, cmd) :
cwd = os.getcwd()
os.chdir(self.local_root)
proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE)
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
code = proc.returncode
os.chdir(cwd)
return code, None, stdout, stderr
[docs] def clean(self) :
pass
[docs] def write_file(self, fname, write_str):
os.makedirs(self.remote_root, exist_ok = True)
with open(os.path.join(self.remote_root, fname), 'w') as fp :
fp.write(write_str)
[docs] def read_file(self, fname):
with open(os.path.join(self.remote_root, fname), 'r') as fp:
ret = fp.read()
return ret
[docs] def check_file_exists(self, fname):
# submission_work_base = os.path.join(self.local_root, self.submission.work_base)
# file_to_be_checked = os.path.join(submission_work_base, fname)
# print('debug:dpdispatcher.LazyLocalContext().check_file_exists:file_to_be_checked', file_to_be_checked)
# return os.path.isfile(file_to_be_checked)
return os.path.isfile(os.path.join(self.remote_root, fname))
[docs] def call(self, cmd) :
cwd = os.getcwd()
os.chdir(self.local_root)
proc = sp.Popen(cmd, shell=True, stdout = sp.PIPE, stderr = sp.PIPE)
os.chdir(cwd)
return proc
[docs] def kill(self, proc):
proc.kill()
[docs] def check_finish(self, proc):
return (proc.poll() != None)
[docs] def get_return(self, proc):
ret = proc.poll()
if ret is None:
return None, None, None
else :
try:
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
except sp.SubprocessError:
stdout = None
stderr = None
return ret, stdout, stderr