Source code for dpdispatcher.dpcloudserver.api

import os
import json
import time

try:
    import oss2
    from oss2 import SizedFileAdapter, determine_part_size
    from oss2.models import PartInfo
except ImportError:
    pass
import requests
from urllib.parse import urljoin
from dpdispatcher import dlog

from .retcode import RETCODE
from .config import HTTP_TIME_OUT, API_HOST


[docs]class API: def __init__(self, email, password): self._login_data = {"password": password, 'email': email} self._token = None self.refresh_token() return
[docs] def get(self, url, params, retry=0): headers = {'Authorization': "jwt " + self._token} ret = None for retry_count in range(3): try: ret = requests.get( urljoin(API_HOST, url), params=params, timeout=HTTP_TIME_OUT, headers=headers ) except Exception as e: dlog.error(f"request error {e}") continue if ret.ok: break else: dlog.error(f"request error status_code:{ret.status_code} reason: {ret.reason} body: \n{ret.text}") time.sleep(retry_count * 10) if ret is None: raise ConnectionError("request fail") # print(url,'>>>', params, '<<<', ret.text) ret.raise_for_status() ret = json.loads(ret.text) if ret['code'] == RETCODE.TOKENINVALID and retry <= 3: dlog.info("debug: token expire, refresh token") if self._login_data is not None: self.refresh_token() ret = self.get(url, params, retry=retry + 1) return ret if ret['code'] != RETCODE.OK: raise ValueError(f"{url} Error: {ret['code']} {ret['message']}") return ret['data']
[docs] def post(self, url, params, retry=0): headers = {'Authorization': "jwt " + self._token} ret = None for retry_count in range(3): try: ret = requests.post( urljoin(API_HOST, url), json=params, timeout=HTTP_TIME_OUT, headers=headers ) except Exception as e: dlog.error(f"request error {e}") continue if ret.ok: break else: dlog.error(f"request error status_code:{ret.status_code} reason: {ret.reason} body: \n{ret.text}") time.sleep(retry_count) if ret is None: raise ConnectionError("request fail") ret.raise_for_status() ret = json.loads(ret.text) # print(url,'>>>', params, '<<<', ret.text) if ret['code'] == RETCODE.TOKENINVALID and retry <= 3: dlog.info("debug: token expire, refresh token") if self._login_data is not None: self.refresh_token() ret = self.post(url, params, retry=retry + 1) return ret if ret['code'] != RETCODE.OK: raise ValueError(f"{url} Error: {ret['code']} {ret['message']}") return ret['data']
[docs] def refresh_token(self): url = '/account/login' ret = requests.post( urljoin(API_HOST, url), json=self._login_data, timeout=HTTP_TIME_OUT, ) dlog.debug(f"debug: login ret:{ret}") ret = json.loads(ret.text) if ret['code'] != RETCODE.OK: raise ValueError(f"{url} Error: {ret['code']} {ret['message']}") self._token = ret['data']['token']
def _get_oss_bucket(self, endpoint, bucket_name): # res = get("/tools/sts_token", {}) res = self.get("/data/get_sts_token", {}) # print('debug>>>>>>>>>>>>>', res) dlog.debug(f"debug: _get_oss_bucket: res:{res}") auth = oss2.StsAuth( res['AccessKeyId'], res['AccessKeySecret'], res['SecurityToken'] ) return oss2.Bucket(auth, endpoint, bucket_name)
[docs] def download(self, oss_file, save_file, endpoint, bucket_name): bucket = self._get_oss_bucket(endpoint, bucket_name) dlog.debug(f"debug: download: oss_file:{oss_file}; save_file:{save_file}") bucket.get_object_to_file(oss_file, save_file) return save_file
[docs] def download_from_url(self, url, save_file): ret = None for retry_count in range(3): try: ret = requests.get( url, headers={'Authorization': "jwt " + self._token}, stream=True ) except Exception as e: dlog.error(f"request error {e}") continue if ret.ok: break else: dlog.error(f"request error status_code:{ret.status_code} reason: {ret.reason} body: \n{ret.text}") time.sleep(retry_count) ret = None if ret is not None: ret.raise_for_status() with open(save_file, 'wb') as f: for chunk in ret.iter_content(chunk_size=8192): f.write(chunk) ret.close()
[docs] def upload(self, oss_task_zip, zip_task_file, endpoint, bucket_name): dlog.debug(f"debug: upload: oss_task_zip:{oss_task_zip}; zip_task_file:{zip_task_file}") bucket = self._get_oss_bucket(endpoint, bucket_name) total_size = os.path.getsize(zip_task_file) part_size = determine_part_size(total_size, preferred_size=1000 * 1024) upload_id = bucket.init_multipart_upload(oss_task_zip).upload_id parts = [] with open(zip_task_file, 'rb') as fileobj: part_number = 1 offset = 0 while offset < total_size: num_to_upload = min(part_size, total_size - offset) result = bucket.upload_part(oss_task_zip, upload_id, part_number, SizedFileAdapter(fileobj, num_to_upload)) parts.append(PartInfo(part_number, result.etag)) offset += num_to_upload part_number += 1 # result = bucket.complete_multipart_upload(oss_task_zip, upload_id, parts) result = bucket.complete_multipart_upload(oss_task_zip, upload_id, parts) # print('debug:upload_result:', result, dir()) return result
[docs] def job_create(self, job_type, oss_path, input_data, program_id=None, group_id=None): post_data = { 'job_type': job_type, 'oss_path': oss_path, } if program_id is not None: post_data["program_id"] = program_id if group_id is not None: post_data["job_group_id"] = group_id if input_data.get('command') is not None: post_data["cmd"] = input_data.get('command') if input_data.get('backward_files') is not None: post_data["out_files"] = input_data.get('backward_files') for k, v in input_data.items(): post_data[k] = v ret = self.post('/data/v2/insert_job', post_data) group_id = ret.get('job_group_id') return ret['job_id'], group_id
[docs] def get_jobs(self, page=1, per_page=10): ret = self.get( '/data/jobs', { 'page': page, 'per_page': per_page, } ) return ret['items']
[docs] def get_tasks(self, job_id, group_id, page=1, per_page=10): ret = self.get( f'data/job/{group_id}/tasks', { 'page': page, 'per_page': per_page, } ) for each in ret['items']: if job_id == each["task_id"]: return each if len(ret['items']) != 0: return self.get_tasks(job_id, group_id, page=page + 1) return None
[docs] def get_tasks_list(self, group_id, per_page=30): result = [] page = 0 while True: ret = self.get( f'data/job/{group_id}/tasks', { 'page': page, 'per_page': per_page, } ) if len(ret['items']) == 0: break for each in ret['items']: result.append(each) page += 1 return result
[docs] def check_job_has_uploaded(self, job_id): try: if not job_id: return False if 'job_group_id' in job_id: ids = job_id.split(":job_group_id:") job_id, _ = int(ids[0]), int(ids[1]) ret = self.get(f'data/job/{job_id}', {}) if len(ret) == 0: return False if ret.get('input_data'): return True else: return False except ValueError as e: dlog.error(e) return False
[docs] def get_job_result_url(self, job_id): try: if not job_id: return None if 'job_group_id' in job_id: ids = job_id.split(":job_group_id:") job_id, _ = int(ids[0]), int(ids[1]) ret = self.get(f'data/job/{job_id}', {}) if 'result_url' in ret and len(ret['result_url']) != 0: return ret.get('result_url') else: return None except ValueError as e: dlog.error(e) return None
# %%