Source code for dpdispatcher.hdfs_context

from dpdispatcher.base_context import BaseContext
import os,shutil,hashlib,tarfile
from glob import glob
from dpdispatcher import dlog
from dpdispatcher.hdfs_cli import HDFS

[docs]class HDFSContext(BaseContext) : def __init__(self, local_root, remote_root, remote_profile={}, *args, **kwargs, ): assert(type(local_root) == str) self.init_local_root = local_root self.init_remote_root = remote_root self.temp_local_root = os.path.abspath(local_root) self.temp_remote_root = remote_root self.remote_profile = remote_profile
[docs] @classmethod def load_from_dict(cls, context_dict): local_root = context_dict['local_root'] remote_root = context_dict['remote_root'] remote_profile = context_dict.get('remote_profile', {}) instance = cls( local_root=local_root, remote_root=remote_root, remote_profile=remote_profile ) return instance
[docs] def get_job_root(self): return self.remote_root
[docs] def bind_submission(self, submission): self.submission = submission self.local_root = os.path.join(self.temp_local_root, submission.work_base) self.remote_root = os.path.join(self.temp_remote_root, submission.submission_hash) HDFS.mkdir(self.remote_root)
def _put_files(self, files, dereference=True): of = self.submission.submission_hash + '_upload.tgz' # local tar cwd = os.getcwd() os.chdir(self.local_root) if os.path.isfile(of) : os.remove(of) with tarfile.open(of, "w:gz", dereference=dereference) as tar: for ii in files : tar.add(ii) os.chdir(cwd) # trans from_f = os.path.join(self.local_root, of) HDFS.copy_from_local(os.path.join(self.local_root, of), self.remote_root) # clean up os.remove(from_f)
[docs] def upload(self, submission, dereference=True): """ upload forward files and forward command files to HDFS root dir Parameters ---------- submission : Submission class instance represents a collection of tasks, such as forward file names Returns ------- none """ cwd = os.getcwd() os.chdir(self.local_root) file_list = [] for task in submission.belonging_tasks: local_job = os.path.join(self.local_root, task.task_work_path) for ff in task.forward_files: abs_file_list = glob(os.path.join(local_job, ff)) if not abs_file_list: os.chdir(cwd) raise RuntimeError('cannot find upload file ' + os.path.join(local_job, ff)) rel_file_list = [os.path.relpath(ii, self.local_root) for ii in abs_file_list] file_list.extend(rel_file_list) local_job = self.local_root for fc in submission.forward_common_files: abs_file_list = glob(os.path.join(local_job, fc)) if not abs_file_list: os.chdir(cwd) raise RuntimeError('cannot find upload file ' + os.path.join(local_job, fc)) rel_file_list = [os.path.relpath(ii, self.local_root) for ii in abs_file_list] file_list.extend(rel_file_list) self._put_files(file_list, dereference=dereference) os.chdir(cwd)
[docs] def download(self, submission, check_exists= False, mark_failure = True, back_error = False): """ download backward files from HDFS root dir Parameters ---------- submission : Submission class instance represents a collection of tasks, such as backward file names Returns ------- none """ cwd = os.getcwd() # download all hdfs files to tmp dir os.chdir(self.local_root) gz_dir = os.path.join(self.local_root, 'tmp') if os.path.exists(gz_dir): shutil.rmtree(gz_dir, ignore_errors=True) os.mkdir('tmp') os.chdir(gz_dir) rfile_tgz = "%s/%s_*_download.tar.gz" % (self.remote_root, submission.submission_hash) lfile_tgz = "%s/tmp/" % (self.local_root) HDFS.copy_to_local(rfile_tgz, lfile_tgz) tgz_file_list = glob(os.path.join(self.local_root, "tmp/*_download.tar.gz")) for tgz in tgz_file_list: with tarfile.open(tgz, "r:gz") as tar: tar.extractall() os.chdir(self.local_root) for task in submission.belonging_tasks: local_job = os.path.join(self.local_root, task.task_work_path) remote_job = os.path.join(gz_dir, task.task_work_path) flist = task.backward_files if back_error: errors = glob(os.path.join(remote_job, 'error*')) flist.extend(errors) for jj in flist: rfile = os.path.join(remote_job, jj) lfile = os.path.join(local_job, jj) if not os.path.exists(rfile): if check_exists: if mark_failure: with open(os.path.join(self.local_root, ii, 'tag_failure_download_%s' % jj), 'w') as fp: pass else: raise RuntimeError('do not find download file ' + rfile) else: raise RuntimeError('do not find download file ' + rfile) else: if os.path.exists(lfile): dlog.info('find existing %s, replacing by %s' % (lfile, rfile)) if os.path.isdir(lfile): shutil.rmtree(lfile, ignore_errors=True) elif os.path.isfile(lfile): os.remove(lfile) shutil.move(rfile, lfile) os.chdir(cwd) local_job = self.local_root remote_job = gz_dir flist = submission.backward_common_files if back_error: errors = glob(os.path.join(remote_job, 'error*')) flist.extend(errors) for jj in flist: rfile = os.path.join(remote_job, jj) lfile = os.path.join(local_job, jj) if not os.path.exists(rfile): if check_exists: if mark_failure: with open(os.path.join(self.local_root, ii, 'tag_failure_download_%s' % jj), 'w') as fp: pass else: raise RuntimeError('do not find download file ' + rfile) else: raise RuntimeError('do not find download file ' + rfile) else: if os.path.exists(lfile): dlog.info('find existing %s, replacing by %s' % (lfile, rfile)) if os.path.isdir(lfile): shutil.rmtree(lfile, ignore_errors=True) elif os.path.isfile(lfile): os.remove(lfile) shutil.move(rfile, lfile) # remove tmp dir shutil.rmtree(gz_dir, ignore_errors=True)
def check_file_exists(self, fname): """ check whether the given file exists, often used in checking whether the belonging job has finished Parameters ---------- fname : string file name to be checked Returns ------- status: boolean """ return HDFS.exists(os.path.join(self.remote_root, fname))
[docs] def clean(self): HDFS.remove(self.remote_root)
[docs] def write_file(self, fname, write_str): local_file = os.path.join("/tmp/", fname) with open(local_file, 'w') as fp: fp.write(write_str) HDFS.copy_from_local(local_file, os.path.join(self.remote_root, fname)) return local_file
[docs] def read_file(self, fname): return HDFS.read_hdfs_file(os.path.join(self.remote_root, fname))
[docs] def check_file_exists(self, fname): return HDFS.exists(os.path.join(self.remote_root, fname))