import os
import shutil
import uuid
import tqdm
try:
from bohriumsdk.client import Client
from bohriumsdk.job import Job
from bohriumsdk.storage import Storage
from bohriumsdk.util import Util
except ModuleNotFoundError:
found_bohriumsdk = False
else:
found_bohriumsdk = True
from dpdispatcher.base_context import BaseContext
from dpdispatcher.dlog import dlog
from dpdispatcher.utils.job_status import JobStatus
DP_CLOUD_SERVER_HOME_DIR = os.path.join(
os.path.expanduser("~"), ".dpdispatcher/", "dp_cloud_server/"
)
[docs]
class OpenAPIContext(BaseContext):
def __init__(
self,
local_root,
remote_root=None,
remote_profile={},
*args,
**kwargs,
):
if not found_bohriumsdk:
raise ModuleNotFoundError(
"bohriumsdk not installed. Install dpdispatcher with `pip install dpdispatcher[bohrium]`"
)
self.init_local_root = local_root
self.init_remote_root = remote_root
self.temp_local_root = os.path.abspath(local_root)
self.remote_profile = remote_profile
self.client = Client()
self.storage = Storage(client=self.client)
self.job = Job(client=self.client)
self.util = Util()
self.jgid = None
[docs]
@classmethod
def load_from_dict(cls, context_dict):
local_root = context_dict.get("local_root", "./")
remote_root = context_dict.get("remote_root", None)
remote_profile = context_dict.get("remote_profile", {})
bohrium_context = cls(
local_root=local_root,
remote_root=remote_root,
remote_profile=remote_profile,
)
return bohrium_context
[docs]
def bind_submission(self, submission):
self.submission = submission
self.local_root = os.path.join(self.temp_local_root, submission.work_base)
self.remote_root = "."
self.submission_hash = submission.submission_hash
self.machine = submission.machine
def _gen_object_key(self, job, zip_filename):
if hasattr(job, "upload_path") and job.upload_path:
return job.upload_path
else:
project_id = self.remote_profile.get("project_id")
uid = uuid.uuid4()
path = os.path.join(str(project_id), str(uid), zip_filename)
setattr(job, "upload_path", path)
return path
[docs]
def upload_job(self, job, common_files=None):
if common_files is None:
common_files = []
self.machine.gen_local_script(job)
zip_filename = job.job_hash + ".zip"
zip_task_file = os.path.join(self.local_root, zip_filename)
upload_file_list = [
job.script_file_name,
f"{job.script_file_name}.run",
]
upload_file_list.extend(common_files)
for task in job.job_task_list:
for file in task.forward_files:
upload_file_list.append(os.path.join(task.task_work_path, file))
upload_zip = Util.zip_file_list(
self.local_root, zip_task_file, file_list=upload_file_list
)
project_id = self.remote_profile.get("project_id", 0)
data = self.job.create(
project_id=project_id,
name=self.remote_profile.get("job_name", "DP-GEN"),
group_id=self.jgid, # type: ignore
)
self.jgid = data.get("jobGroupId", "") # type: ignore
token = data.get("token", "") # type: ignore
object_key = os.path.join(data["storePath"], zip_filename) # type: ignore
job.upload_path = object_key
job.job_id = data["jobId"] # type: ignore
job.jgid = data["jobGroupId"] # type: ignore
self.storage.upload_From_file_multi_part(
object_key=object_key, file_path=upload_zip, token=token
)
# self._backup(self.local_root, upload_zip)
[docs]
def upload(self, submission):
# oss_task_dir = os.path.join('%s/%s/%s.zip' % ('indicate', file_uuid, file_uuid))
# zip_filename = submission.submission_hash + '.zip'
# oss_task_zip = 'indicate/' + submission.submission_hash + '/' + zip_filename
# zip_path = "/home/felix/workplace/22_dpdispatcher/dpdispatcher-yfb/dpdispatcher/dpcloudserver/t.txt"
# zip_path = self.local_root
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
job_to_be_uploaded = []
result = None
dlog.info("checking all job has been uploaded")
for job in submission.belonging_jobs:
if job.job_state in (JobStatus.unsubmitted, JobStatus.terminated):
job_to_be_uploaded.append(job)
if len(job_to_be_uploaded) == 0:
dlog.info("all job has been uploaded, continue")
return result
for job in tqdm.tqdm(
job_to_be_uploaded,
desc="Uploading to tiefblue",
bar_format=bar_format,
leave=False,
disable=None,
):
self.upload_job(job, submission.forward_common_files)
return result
# return oss_task_zip
# api.upload(self.oss_task_dir, zip_task_file)
[docs]
def download(self, submission):
jobs = submission.belonging_jobs
job_hashs = {}
job_infos = {}
job_result = []
for job in jobs:
jid = job.job_id
job_hashs[jid] = job.job_hash
jobinfo = self.job.detail(jid)
# jobinfo = self.api.get_job_detail(jid)
job_result.append(jobinfo)
# if group_id is not None:
# job_result = self.api.get_tasks_list(group_id)
for each in job_result:
if "resultUrl" in each and each["resultUrl"] != "" and each["status"] == 2:
job_hash = ""
if each["id"] not in job_hashs:
dlog.info(
f"find unexpect job_hash, but task {each['id']} still been download."
)
dlog.debug(str(job_hashs))
job_hash = str(each["id"])
else:
job_hash = job_hashs[each["id"]]
job_infos[job_hash] = each
bar_format = "{l_bar}{bar}| {n:.02f}/{total:.02f} % [{elapsed}<{remaining}, {rate_fmt}{postfix}]"
for job_hash, info in tqdm.tqdm(
job_infos.items(),
desc="Validating download file from Lebesgue",
bar_format=bar_format,
leave=False,
disable=None,
):
result_filename = job_hash + "_back.zip"
target_result_zip = os.path.join(self.local_root, result_filename)
if self._check_if_job_has_already_downloaded(
target_result_zip, self.local_root
):
continue
self.storage.download_from_url(info["resultUrl"], target_result_zip)
Util.unzip_file(target_result_zip, out_dir=self.local_root)
self._backup(self.local_root, target_result_zip)
self._clean_backup(
self.local_root, keep_backup=self.remote_profile.get("keep_backup", True)
)
return True
[docs]
def write_file(self, fname, write_str):
result = self.write_home_file(fname, write_str)
return result
[docs]
def write_local_file(self, fname, write_str):
local_filename = os.path.join(self.local_root, fname)
with open(local_filename, "w") as f:
f.write(write_str)
return local_filename
[docs]
def read_file(self, fname):
result = self.read_home_file(fname)
return result
[docs]
def write_home_file(self, fname, write_str):
# os.makedirs(self.remote_root, exist_ok = True)
with open(os.path.join(DP_CLOUD_SERVER_HOME_DIR, fname), "w") as fp:
fp.write(write_str)
return True
[docs]
def read_home_file(self, fname):
with open(os.path.join(DP_CLOUD_SERVER_HOME_DIR, fname)) as fp:
ret = fp.read()
return ret
[docs]
def check_file_exists(self, fname):
result = self.check_home_file_exits(fname)
return result
[docs]
def check_home_file_exits(self, fname):
return os.path.isfile(os.path.join(DP_CLOUD_SERVER_HOME_DIR, fname))
[docs]
def clean(self):
submission_file_name = f"{self.submission.submission_hash}.json"
submission_json = os.path.join(DP_CLOUD_SERVER_HOME_DIR, submission_file_name)
os.remove(submission_json)
return True
def _check_if_job_has_already_downloaded(self, target, local_root):
backup_file_location = os.path.join(
local_root, "backup", os.path.split(target)[1]
)
if os.path.exists(backup_file_location):
return True
else:
return False
def _backup(self, local_root, target):
try:
# move to backup directory
os.makedirs(os.path.join(local_root, "backup"), exist_ok=True)
shutil.move(
target, os.path.join(local_root, "backup", os.path.split(target)[1])
)
except (OSError, shutil.Error) as e:
dlog.exception("unable to backup file, " + str(e))
def _clean_backup(self, local_root, keep_backup=True):
if not keep_backup:
dir_to_be_removed = os.path.join(local_root, "backup")
if os.path.exists(dir_to_be_removed):
shutil.rmtree(dir_to_be_removed)