# %%
import asyncio
import copy
import functools
import json
import os
import pathlib
import random
import time
import uuid
from hashlib import sha1
from typing import List, Optional
import yaml
from dargs.dargs import Argument, Variant
from dpdispatcher.dlog import dlog
from dpdispatcher.machine import Machine
from dpdispatcher.utils.job_status import JobStatus
from dpdispatcher.utils.record import record
# %%
default_strategy = dict(if_cuda_multi_devices=False, ratio_unfinished=0.0)
[docs]
class Submission:
"""A submission represents a collection of tasks.
These tasks usually locate at a common directory.
And these Tasks may share common files to be uploaded and downloaded.
Parameters
----------
work_base : Path
the base directory of the local tasks. It is usually the dir name of project .
machine : Machine
machine class object (for example, PBS, Slurm, Shell) to execute the jobs.
The machine can still be bound after the instantiation with the bind_submission method.
resources : Resources
the machine resources (cpu or gpu) used to generate the slurm/pbs script
forward_common_files : list
the common files to be uploaded to other computers before the jobs begin
backward_common_files : list
the common files to be downloaded from other computers after the jobs finish
task_list : list of Task
a list of tasks to be run.
"""
def __init__(
self,
work_base,
machine=None,
resources=None,
forward_common_files=[],
backward_common_files=[],
*,
task_list=[],
):
# self.submission_list = submission_list
self.local_root = None
self.work_base = work_base
self._abs_work_base = os.path.abspath(work_base)
self.resources = resources
self.forward_common_files = (
sorted(forward_common_files)
if isinstance(forward_common_files, list)
else forward_common_files
)
self.backward_common_files = (
sorted(backward_common_files)
if isinstance(backward_common_files, list)
else backward_common_files
)
self.submission_hash = None
# warning: can not remote .copy() or there will be bugs
# self.belonging_tasks = task_list
self.belonging_tasks = task_list.copy()
self.belonging_jobs = list()
self.bind_machine(machine)
def __repr__(self):
return json.dumps(self.serialize(), indent=4)
def __eq__(self, other):
"""When check whether the two submission are equal,
we disregard the runtime infomation(job_state, job_id, fail_count) of the submission.belonging_jobs.
"""
return json.dumps(self.serialize(if_static=True)) == json.dumps(
other.serialize(if_static=True)
)
def __getitem__(self, key):
return self.serialize()[key]
[docs]
@classmethod
def deserialize(cls, submission_dict, machine=None):
"""Convert the submission_dict to a Submission class object.
Parameters
----------
submission_dict : dict
path-like, the base directory of the local tasks
machine : Machine
Machine class Object to execute the jobs
Returns
-------
submission : Submission
the Submission class instance converted from the submission_dict
"""
submission = cls(
work_base=submission_dict["work_base"],
resources=Resources.deserialize(
resources_dict=submission_dict["resources"]
),
forward_common_files=submission_dict["forward_common_files"],
backward_common_files=submission_dict["backward_common_files"],
)
submission.belonging_jobs = [
Job.deserialize(job_dict=job_dict)
for job_dict in submission_dict["belonging_jobs"]
]
submission.submission_hash = submission.get_hash()
if machine is not None:
submission.bind_machine(machine=machine)
else:
machine = Machine.deserialize(machine_dict=submission_dict["machine"])
submission.bind_machine(machine)
return submission
[docs]
def serialize(self, if_static=False):
"""Convert the Submission class instance to a dictionary.
Parameters
----------
if_static : bool
whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.
Returns
-------
submission_dict : dict
the dictionary converted from the Submission class instance
"""
assert self.resources is not None
submission_dict = {}
# if if_none_local_root:
# submission_dict['local_root'] = None
# else:
# submission_dict['local_root'] = self.local_root
submission_dict["work_base"] = self.work_base
submission_dict["_abs_work_base"] = self._abs_work_base
machine = getattr(self, "machine", None)
if machine is None:
submission_dict["machine"] = {}
else:
submission_dict["machine"] = machine.serialize()
submission_dict["resources"] = self.resources.serialize()
submission_dict["forward_common_files"] = self.forward_common_files
submission_dict["backward_common_files"] = self.backward_common_files
submission_dict["belonging_jobs"] = [
job.serialize(if_static=if_static) for job in self.belonging_jobs
]
return submission_dict
[docs]
def register_task(self, task):
if self.belonging_jobs:
raise RuntimeError(
"Not allowed to register tasks after generating jobs. "
f"submission hash error {self}"
)
self.belonging_tasks.append(task)
[docs]
def register_task_list(self, task_list):
if self.belonging_jobs:
raise RuntimeError(
"Not allowed to register tasks after generating jobs. "
f"submission hash error {self}"
)
self.belonging_tasks.extend(task_list)
[docs]
def get_hash(self):
return sha1(
json.dumps(self.serialize(if_static=True)).encode("utf-8")
).hexdigest()
[docs]
def bind_machine(self, machine):
"""Bind this submission to a machine. update the machine's context remote_root and local_root.
Parameters
----------
machine : Machine
the machine to bind with
"""
self.submission_hash = self.get_hash()
self.machine = machine
for job in self.belonging_jobs:
job.machine = machine
if machine is not None:
self.machine.context.bind_submission(self)
self.local_root = machine.context.temp_local_root
return self
[docs]
def run_submission(
self, *, dry_run=False, exit_on_submit=False, clean=True, check_interval=30
):
"""Main method to execute the submission.
First, check whether old Submission exists on the remote machine, and try to recover from it.
Second, upload the local files to the remote machine where the tasks to be executed.
Third, run the submission defined previously.
Forth, wait until the tasks in the submission finished and download the result file to local directory.
If dry_run is True, submission will be uploaded but not be executed and exit.
If exit_on_submit is True, submission will exit.
"""
assert self.resources is not None
if not self.belonging_jobs:
self.generate_jobs()
self.try_recover_from_json()
self.update_submission_state()
if self.check_all_finished():
dlog.info("info:check_all_finished: True")
else:
dlog.info("info:check_all_finished: False")
self.upload_jobs()
if dry_run is True:
dlog.info(f"submission succeeded: {self.submission_hash}")
dlog.info(f"at {self.machine.context.remote_root}")
return self.serialize()
self.handle_unexpected_submission_state()
self.submission_to_json()
time.sleep(1)
self.update_submission_state()
self.check_all_finished()
self.handle_unexpected_submission_state()
ratio_unfinished = self.resources.strategy["ratio_unfinished"]
while not self.check_all_finished():
if exit_on_submit is True:
dlog.info(f"submission succeeded: {self.submission_hash}")
dlog.info(f"at {self.machine.context.remote_root}")
return self.serialize()
if ratio_unfinished > 0.0 and self.check_ratio_unfinished(ratio_unfinished):
self.remove_unfinished_tasks()
break
try:
time.sleep(check_interval)
except (Exception, KeyboardInterrupt, SystemExit) as e:
self.submission_to_json()
record_path = record.write(self)
dlog.exception(e)
dlog.info(f"submission exit: {self.submission_hash}")
dlog.info(f"at {self.machine.context.remote_root}")
dlog.info(f"Submission information is saved in {str(record_path)}.")
dlog.debug(self.serialize())
raise e
else:
self.update_submission_state()
self.handle_unexpected_submission_state()
finally:
pass
self.handle_unexpected_submission_state()
self.try_download_result()
self.submission_to_json()
if clean:
self.clean_jobs()
return self.serialize()
[docs]
def try_download_result(self):
start_time = time.time()
retry_interval = 60 # retry every 1 minute
success = False
while not success:
try:
self.download_jobs()
success = True
except FileNotFoundError as e:
# retry will never success if the file is not found
raise e
except (EOFError, Exception) as e:
dlog.exception(e)
elapsed_time = time.time() - start_time
if elapsed_time < 3600: # in 1 h
dlog.info("Retrying in 1 minute...")
time.sleep(retry_interval)
elif elapsed_time < 86400: # 1 h ~ 24 h
retry_interval = 600 # retry every 10 min
dlog.info("Retrying in 10 minutes...")
time.sleep(retry_interval)
else: # > 24 h
dlog.info("Maximum retries time reached. Exiting.")
break
[docs]
async def async_run_submission(self, **kwargs):
"""Async interface of run_submission.
Examples
--------
>>> import asyncio
>>> from dpdispacher import Machine, Resource, Submission
>>> async def run_jobs():
... backgroud_task = set()
... # task1
... task1 = Task(...)
... submission1 = Submission(..., task_list=[task1])
... background_task = asyncio.create_task(
... submission1.async_run_submission(check_interval=2, clean=False)
... )
... # task2
... task2 = Task(...)
... submission2 = Submission(..., task_list=[task1])
... background_task = asyncio.create_task(
... submission2.async_run_submission(check_interval=2, clean=False)
... )
... background_tasks.add(background_task)
... result = await asyncio.gather(*background_tasks)
... return result
>>> run_jobs()
May raise Error if pass `clean=True` explicitly when submit to pbs or slurm.
"""
kwargs = {**{"clean": False}, **kwargs}
if kwargs["clean"]:
dlog.warning(
"Using async submission with `clean=True`, "
"job may fail in queue system"
)
loop = asyncio.get_event_loop()
wrapped_submission = functools.partial(self.run_submission, **kwargs)
return await loop.run_in_executor(None, wrapped_submission)
[docs]
def update_submission_state(self):
"""Check whether all the jobs in the submission.
Notes
-----
this method will not handle unexpected (like resubmit terminated) job state in the submission.
"""
for job in self.belonging_jobs:
if job.job_state == JobStatus.finished:
# finished job will be finished for ever, skip
continue
job.get_job_state()
dlog.debug(
f"debug:update_submission_state: job: {job.job_hash}, {job.job_id}, {job.job_state}"
)
[docs]
def handle_unexpected_submission_state(self):
"""Handle unexpected job state of the submission.
If the job state is unsubmitted, submit the job.
If the job state is terminated (killed unexpectly), resubmit the job.
If the job state is unknown, raise an error.
"""
try:
for job in self.belonging_jobs:
job.handle_unexpected_job_state()
except Exception as e:
self.submission_to_json()
record_path = record.write(self)
raise RuntimeError(
f"Meet errors will handle unexpected submission state.\n"
f"Debug information: remote_root=={self.machine.context.remote_root}.\n"
f"Debug information: submission_hash=={self.submission_hash}.\n"
f"Please check error messages above and in remote_root. "
f"The submission information is saved in {str(record_path)}.\n"
f"For furthur actions, run the following command with proper flags: dpdisp submission {self.submission_hash}"
) from e
[docs]
def check_ratio_unfinished(self, ratio_unfinished: float) -> bool:
"""Calculate the ratio of unfinished tasks in the submission.
Parameters
----------
ratio_unfinished : float
the ratio of unfinished tasks in the submission
Returns
-------
bool
whether the ratio of unfinished tasks in the submission is larger than ratio_unfinished
"""
assert self.resources is not None
if self.resources.group_size == 1:
# if group size is 1, calculate job state is enough and faster
status_list = [job.job_state for job in self.belonging_jobs]
else:
# get task state is more accurate
status_list = []
for task in self.belonging_tasks:
task.get_task_state(self.machine.context)
status_list.append(task.task_state)
finished_num = status_list.count(JobStatus.finished)
return finished_num / len(self.belonging_tasks) >= (1 - ratio_unfinished)
[docs]
def remove_unfinished_tasks(self):
dlog.info("Remove unfinished tasks")
# kill all jobs and mark them as finished
for job in self.belonging_jobs:
if job.job_state != JobStatus.finished:
self.machine.kill(job)
job.job_state = JobStatus.finished
# remove all unfinished tasks
finished_tasks = []
for task in self.belonging_tasks:
if task.task_state == JobStatus.finished:
finished_tasks.append(task)
# there is no need to remove actual remote directory
# as it should be cleaned anyway
self.belonging_tasks = finished_tasks
# clean removed tasks in jobs - although this should not be necessary
for job in self.belonging_jobs:
job.job_task_list = [
task
for task in job.job_task_list
if task.task_state == JobStatus.finished
]
[docs]
def check_all_finished(self):
"""Check whether all the jobs in the submission.
Notes
-----
This method will not handle unexpected job state in the submission.
"""
# self.update_submission_state()
if any(
(job.job_state in [JobStatus.terminated, JobStatus.unknown])
for job in self.belonging_jobs
):
self.submission_to_json()
if any(
(
job.job_state
in [
JobStatus.running,
JobStatus.waiting,
JobStatus.unsubmitted,
JobStatus.completing,
JobStatus.terminated,
JobStatus.unknown,
]
)
for job in self.belonging_jobs
):
return False
else:
return True
[docs]
def generate_jobs(self):
"""After tasks register to the self.belonging_tasks,
This method generate the jobs and add these jobs to self.belonging_jobs.
The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task.
Why we randomly shuffle the tasks is under the consideration of load balance.
The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.
"""
assert self.resources is not None
if self.belonging_jobs:
raise RuntimeError(
f"Can not generate jobs when submission.belonging_jobs is not empty. debug:{self}"
)
group_size = self.resources.group_size
if (group_size < 0) or (not isinstance(group_size, int)):
raise RuntimeError("group_size must be a positive number")
task_num = len(self.belonging_tasks)
if task_num == 0:
raise RuntimeError("submission must have at least 1 task")
if group_size == 0:
# 0 means infinity
group_size = task_num
random.seed(42)
random_task_index = list(range(task_num))
random.shuffle(random_task_index)
random_task_index_ll = [
random_task_index[ii : ii + group_size]
for ii in range(0, task_num, group_size)
]
for ii in random_task_index_ll:
job_task_list = [self.belonging_tasks[jj] for jj in ii]
job = Job(
job_task_list=job_task_list,
machine=self.machine,
resources=copy.deepcopy(self.resources),
)
self.belonging_jobs.append(job)
if self.machine is not None:
self.bind_machine(self.machine)
self.submission_hash = self.get_hash()
[docs]
def upload_jobs(self):
self.machine.context.upload(self)
[docs]
def download_jobs(self):
self.machine.context.download(self)
# for job in self.belonging_jobs:
# job.tag_finished()
# self.machine.context.write_file(self.machine.finish_tag_name, write_str="")
[docs]
def clean_jobs(self):
self.machine.context.clean()
assert self.submission_hash is not None
record.remove(self.submission_hash)
[docs]
def submission_to_json(self):
# self.update_submission_state()
write_str = json.dumps(self.serialize(), indent=4, default=str)
submission_file_name = f"{self.submission_hash}.json"
self.machine.context.write_file(submission_file_name, write_str=write_str)
[docs]
@classmethod
def submission_from_json(cls, json_file_name="submission.json"):
with open(json_file_name) as f:
submission_dict = json.load(f)
# submission_dict = machine.context.read_file(json_file_name)
submission = cls.deserialize(submission_dict=submission_dict, machine=None)
return submission
# def check_if_recover()
[docs]
def try_recover_from_json(self):
submission_file_name = f"{self.submission_hash}.json"
if_recover = self.machine.context.check_file_exists(submission_file_name)
submission = None
submission_dict = {}
if if_recover:
submission_dict_str = self.machine.context.read_file(
fname=submission_file_name
)
submission_dict = json.loads(submission_dict_str)
submission = Submission.deserialize(submission_dict=submission_dict)
submission.bind_machine(machine=self.machine)
if self == submission:
self.belonging_jobs = submission.belonging_jobs
self.belonging_tasks = [
task for job in self.belonging_jobs for task in job.job_task_list
]
self.bind_machine(machine=self.machine)
dlog.info(
f"Find old submission; recover submission from json file;"
f"submission.submission_hash:{submission.submission_hash}; "
f"machine.context.remote_root:{self.machine.context.remote_root}; "
f"submission.work_base:{submission.work_base};"
)
# self = submission.bind_machine(machine=self.machine)
else:
print(self.serialize())
print(submission.serialize())
raise RuntimeError("Recover failed.")
[docs]
class Task:
"""A task is a sequential command to be executed,
as well as the files it depends on to transmit forward and backward.
Parameters
----------
command : Str
the command to be executed.
task_work_path : Path
the directory of each file where the files are dependent on.
forward_files : list of Path
the files to be transmitted to remote machine before the command execute.
backward_files : list of Path
the files to be transmitted from remote machine after the comand finished.
outlog : Str
the filename to which command redirect stdout
errlog : Str
the filename to which command redirect stderr
"""
def __init__(
self,
command,
task_work_path,
forward_files=[],
backward_files=[],
outlog="log",
errlog="err",
):
self.command = command
self.task_work_path = task_work_path
self.forward_files = forward_files
self.backward_files = backward_files
self.outlog = outlog
self.errlog = errlog
# self.task_need_resources = task_need_resources
self.task_hash = self.get_hash()
# self.task_need_resources="<to be completed in the future>"
# self.uuid =
self.task_state = JobStatus.unsubmitted
def __repr__(self):
return str(self.serialize())
def __eq__(self, other):
return json.dumps(self.serialize()) == json.dumps(other.serialize())
def __getitem__(self, key):
return self.serialize()[key]
[docs]
def get_hash(self):
return sha1(json.dumps(self.serialize()).encode("utf-8")).hexdigest()
[docs]
@classmethod
def load_from_json(cls, json_file):
with open(json_file) as f:
task_dict = json.load(f)
return cls.load_from_dict(task_dict)
[docs]
@classmethod
def load_from_yaml(cls, yaml_file):
with open(yaml_file) as f:
task_dict = yaml.safe_load(f)
task = cls.load_from_dict(task_dict=task_dict)
return task
[docs]
@classmethod
def load_from_dict(cls, task_dict: dict) -> "Task":
# check dict
base = cls.arginfo()
task_dict = base.normalize_value(task_dict, trim_pattern="_*")
base.check_value(task_dict, strict=False)
task = cls.deserialize(task_dict=task_dict)
return task
[docs]
@classmethod
def deserialize(cls, task_dict):
"""Convert the task_dict to a Task class object.
Parameters
----------
task_dict : dict
the dictionary which contains the task information
Returns
-------
task : Task
the Task class instance converted from the task_dict
"""
task = cls(**task_dict)
return task
[docs]
def serialize(self):
task_dict = {}
task_dict["command"] = self.command
task_dict["task_work_path"] = self.task_work_path
task_dict["forward_files"] = self.forward_files
task_dict["backward_files"] = self.backward_files
task_dict["outlog"] = self.outlog
task_dict["errlog"] = self.errlog
# task_dict['task_need_resources'] = self.task_need_resources
return task_dict
[docs]
@staticmethod
def arginfo():
doc_command = (
"A command to be executed of this task. The expected return code is 0."
)
doc_task_work_path = "The dir where the command to be executed."
doc_forward_files = (
"The files to be uploaded in task_work_path before the task exectued."
)
doc_backward_files = "The files to be download to local_root in task_work_path after the task finished"
doc_outlog = "The out log file name. redirect from stdout"
doc_errlog = "The err log file name. redirect from stderr"
task_args = [
Argument("command", str, optional=False, doc=doc_command),
Argument("task_work_path", str, optional=False, doc=doc_task_work_path),
Argument(
"forward_files",
List[str],
optional=True,
doc=doc_forward_files,
default=[],
),
Argument(
"backward_files",
List[str],
optional=True,
doc=doc_backward_files,
default=[],
),
Argument(
"outlog",
[type(None), str],
optional=True,
doc=doc_outlog,
default="log",
),
Argument(
"errlog",
[type(None), str],
optional=True,
doc=doc_errlog,
default="err",
),
]
task_format = Argument("task", dict, task_args)
return task_format
[docs]
def get_task_state(self, context):
"""Get the task state by checking the tag file.
Parameters
----------
context : Context
the context of the task
"""
if self.task_state in (JobStatus.finished, JobStatus.unsubmitted):
# finished task should always be finished
# unsubmitted task do not need to check tag
return
# check tag
task_tag_finished = (
pathlib.PurePath(self.task_work_path)
/ (self.task_hash + "_task_tag_finished")
).as_posix()
result = context.check_file_exists(task_tag_finished)
if result:
self.task_state = JobStatus.finished
[docs]
class Job:
"""Job is generated by Submission automatically.
A job ususally has many tasks and it may request computing resources from job scheduler systems.
Each Job can generate a script file to be submitted to the job scheduler system or executed locally.
Parameters
----------
job_task_list : list of Task
the tasks belonging to the job
resources : Resources
the machine resources. Passed from Submission when it constructs jobs.
machine : machine
machine object to execute the job. Passed from Submission when it constructs jobs.
"""
def __init__(
self,
job_task_list,
*,
resources,
machine=None,
):
self.job_task_list = job_task_list
# self.job_work_base = job_work_base
self.resources = resources
self.machine = machine
self.job_state = None # JobStatus.unsubmitted
self.job_id = ""
self.fail_count = 0
self.job_uuid = uuid.uuid4()
# self.job_hash = self.get_hash()
self.job_hash = self.get_hash()
self.script_file_name = self.job_hash + ".sub"
def __repr__(self):
return str(self.serialize())
def __eq__(self, other):
"""When check whether the two jobs are equal,
we disregard the runtime infomation(job_state, job_id, fail_count) of the jobs.
"""
return json.dumps(self.serialize(if_static=True)) == json.dumps(
other.serialize(if_static=True)
)
[docs]
@classmethod
def deserialize(cls, job_dict, machine=None):
"""Convert the job_dict to a Submission class object.
Parameters
----------
job_dict : dict
the dictionary which contains the job information
machine : Machine
the machine object to execute the job
Returns
-------
submission : Job
the Job class instance converted from the job_dict
"""
if len(job_dict.keys()) != 1:
raise RuntimeError(
f"json file may be broken, len(job_dict.keys()) must be 1. {job_dict}"
)
job_hash = list(job_dict.keys())[0]
job_task_list = [
Task.deserialize(task_dict)
for task_dict in job_dict[job_hash]["job_task_list"]
]
job = Job(
job_task_list=job_task_list,
resources=Resources.deserialize(
resources_dict=job_dict[job_hash]["resources"]
),
machine=machine,
)
# job.job_runtime_info=job_dict[job_hash]['job_runtime_info']
job.job_state = job_dict[job_hash]["job_state"]
job.job_id = job_dict[job_hash]["job_id"]
job.fail_count = job_dict[job_hash]["fail_count"]
# job.job_uuid = job_dict[job_hash]['job_uuid']
for task in job.job_task_list:
task.task_state = job.job_state
return job
[docs]
def get_job_state(self):
"""Get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.
Notes
-----
this method will not submit or resubmit the jobs if the job is unsubmitted.
"""
dlog.debug(
f"debug:query database; self.job_hash:{self.job_hash}; self.job_id:{self.job_id}"
)
assert self.machine is not None
job_state = self.machine.check_status(self)
self.job_state = job_state
# update general task_state, which should be faster than checking tags
for task in self.job_task_list:
# only update if the task is not finished
if task.task_state != JobStatus.finished:
task.task_state = job_state
[docs]
def handle_unexpected_job_state(self):
job_state = self.job_state
if job_state == JobStatus.unknown:
raise RuntimeError(f"job_state for job {self} is unknown")
if job_state == JobStatus.terminated:
self.fail_count += 1
dlog.info(
f"job: {self.job_hash} {self.job_id} terminated; "
f"fail_cout is {self.fail_count}; resubmitting job"
)
retry_count = 3
assert self.machine is not None
if hasattr(self.machine, "retry_count") and self.machine.retry_count >= 0:
retry_count = self.machine.retry_count + 1
if (self.fail_count) > 0 and (self.fail_count % retry_count == 0):
last_error_message = self.get_last_error_message()
err_msg = (
f"job:{self.job_hash} {self.job_id} failed {self.fail_count} times."
)
if last_error_message is not None:
err_msg += f"\nPossible remote error message: {last_error_message}"
raise RuntimeError(err_msg)
self.submit_job()
if self.job_state != JobStatus.unsubmitted:
dlog.info(
f"job:{self.job_hash} re-submit after terminated; new job_id is {self.job_id}"
)
time.sleep(0.2)
self.get_job_state()
dlog.info(
f"job:{self.job_hash} job_id:{self.job_id} after re-submitting; the state now is {repr(self.job_state)}"
)
self.handle_unexpected_job_state()
if self.resources.wait_time != 0:
time.sleep(self.resources.wait_time)
if job_state == JobStatus.unsubmitted:
dlog.debug(f"job: {self.job_hash} unsubmitted; submit it")
# if self.fail_count > 3:
# raise RuntimeError("job:job {job} failed 3 times".format(job=self))
self.submit_job()
if self.job_state != JobStatus.unsubmitted:
dlog.info(f"job: {self.job_hash} submit; job_id is {self.job_id}")
if self.resources.wait_time != 0:
time.sleep(self.resources.wait_time)
# self.get_job_state()
[docs]
def get_hash(self):
return str(list(self.serialize(if_static=True).keys())[0])
[docs]
def serialize(self, if_static=False):
"""Convert the Task class instance to a dictionary.
Parameters
----------
if_static : bool
whether dump the job runtime infomation (job_id, job_state, fail_count, job_uuid etc.) to the dictionary.
Returns
-------
task_dict : dict
the dictionary converted from the Task class instance
"""
job_content_dict = {}
# for task in self.job_task_list:
job_content_dict["job_task_list"] = [
task.serialize() for task in self.job_task_list
]
job_content_dict["resources"] = self.resources.serialize()
# job_content_dict['job_work_base'] = self.job_work_base
job_hash = sha1(json.dumps(job_content_dict).encode("utf-8")).hexdigest()
if not if_static:
job_content_dict["job_state"] = self.job_state
job_content_dict["job_id"] = self.job_id
job_content_dict["fail_count"] = self.fail_count
# job_content_dict['job_uuid'] = self.job_uuid
return {job_hash: job_content_dict}
[docs]
def register_job_id(self, job_id):
self.job_id = job_id
[docs]
def submit_job(self):
assert self.machine is not None
job_id = self.machine.do_submit(self)
self.register_job_id(job_id)
if job_id:
self.job_state = JobStatus.waiting
else:
self.job_state = JobStatus.unsubmitted
[docs]
def job_to_json(self):
write_str = json.dumps(self.serialize(), indent=2, default=str)
assert self.machine is not None
self.machine.context.write_file(
self.job_hash + "_job.json", write_str=write_str
)
[docs]
def get_last_error_message(self) -> Optional[str]:
"""Get last error message when the job is terminated."""
assert self.machine is not None
last_err_file = self.job_hash + "_last_err_file"
if self.machine.context.check_file_exists(last_err_file):
last_error_message = self.machine.context.read_file(last_err_file)
# red color
last_error_message = "\033[31m" + last_error_message + "\033[0m"
return last_error_message
[docs]
class Resources:
"""Resources is used to describe the machine resources we need to do calculations.
Parameters
----------
number_node : int
The number of node need for each `job`.
cpu_per_node : int
cpu numbers of each node.
gpu_per_node : int
gpu numbers of each node.
queue_name : str
The queue name of batch job scheduler system.
group_size : int
The number of `tasks` in a `job`.
custom_flags : list of Str
The extra lines pass to job submitting script header
strategy : dict
strategies we use to generation job submitting scripts.
if_cuda_multi_devices : bool
If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS.
If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task.
Usually, this option will be used with Task.task_need_resources variable simultaneously.
ratio_unfinished : float
The ratio of `task` that can be unfinished.
customized_script_header_template_file : str
The customized template file to generate job submitting script header,
which overrides the default file.
para_deg : int
Decide how many tasks will be run in parallel.
Usually run with `strategy['if_cuda_multi_devices']`
source_list : list of Path
The env file to be sourced before the command execution.
wait_time : int
The waitting time in second after a single task submitted. Default: 0.
"""
def __init__(
self,
number_node,
cpu_per_node,
gpu_per_node,
queue_name,
group_size,
*,
custom_flags=[],
strategy=default_strategy,
para_deg=1,
module_unload_list=[],
module_purge=False,
module_list=[],
source_list=[],
envs={},
prepend_script=[],
append_script=[],
wait_time=0,
**kwargs,
):
self.number_node = number_node
self.cpu_per_node = cpu_per_node
self.gpu_per_node = gpu_per_node
self.queue_name = queue_name
self.group_size = group_size
# self.extra_specification = extra_specification
self.custom_flags = custom_flags
self.strategy = strategy
self.para_deg = para_deg
self.module_purge = module_purge
self.module_unload_list = module_unload_list
self.module_list = module_list
self.source_list = source_list
self.envs = envs
self.prepend_script = prepend_script
self.append_script = append_script
self.wait_time = wait_time
# self.if_cuda_multi_devices = if_cuda_multi_devices
self.kwargs = kwargs.get("kwargs", kwargs)
self.gpu_in_use = 0
self.task_in_para = 0
# self. = 0
# if self.gpu_per_node > 1:
# self.in_para_task_num = 0
for kk, value in default_strategy.items():
self.strategy.setdefault(kk, value)
if self.strategy["if_cuda_multi_devices"] is True:
if gpu_per_node < 1:
raise RuntimeError(
"gpu_per_node can not be smaller than 1 when if_cuda_multi_devices is True"
)
if number_node != 1:
raise RuntimeError(
"number_node must be 1 when if_cuda_multi_devices is True"
)
if self.strategy["ratio_unfinished"] >= 1.0:
raise RuntimeError("ratio_unfinished must be smaller than 1.0")
def __eq__(self, other):
return json.dumps(self.serialize()) == json.dumps(other.serialize())
[docs]
def serialize(self):
resources_dict = {}
resources_dict["number_node"] = self.number_node
resources_dict["cpu_per_node"] = self.cpu_per_node
resources_dict["gpu_per_node"] = self.gpu_per_node
resources_dict["queue_name"] = self.queue_name
resources_dict["group_size"] = self.group_size
resources_dict["custom_flags"] = self.custom_flags
resources_dict["strategy"] = self.strategy
resources_dict["para_deg"] = self.para_deg
resources_dict["module_purge"] = self.module_purge
resources_dict["module_unload_list"] = self.module_unload_list
resources_dict["module_list"] = self.module_list
resources_dict["source_list"] = self.source_list
resources_dict["envs"] = self.envs
resources_dict["prepend_script"] = self.prepend_script
resources_dict["append_script"] = self.append_script
resources_dict["wait_time"] = self.wait_time
resources_dict["kwargs"] = self.kwargs
return resources_dict
[docs]
@classmethod
def deserialize(cls, resources_dict):
resources = cls(
number_node=resources_dict.get("number_node", 1),
cpu_per_node=resources_dict.get("cpu_per_node", 1),
gpu_per_node=resources_dict.get("gpu_per_node", 0),
queue_name=resources_dict.get("queue_name", ""),
group_size=resources_dict["group_size"],
custom_flags=resources_dict.get("custom_flags", []),
strategy=resources_dict.get("strategy", default_strategy),
para_deg=resources_dict.get("para_deg", 1),
module_purge=resources_dict.get("module_purge", False),
module_unload_list=resources_dict.get("module_unload_list", []),
module_list=resources_dict.get("module_list", []),
source_list=resources_dict.get("source_list", []),
envs=resources_dict.get("envs", {}),
prepend_script=resources_dict.get("prepend_script", []),
append_script=resources_dict.get("append_script", []),
wait_time=resources_dict.get("wait_time", 0),
**resources_dict.get("kwargs", {}),
)
return resources
def __getitem__(self, key):
return self.serialize()[key]
[docs]
@classmethod
def load_from_json(cls, json_file):
with open(json_file) as f:
resources_dict = json.load(f)
resources = cls.load_from_dict(resources_dict=resources_dict)
return resources
[docs]
@classmethod
def load_from_yaml(cls, yaml_file):
with open(yaml_file) as f:
resources_dict = yaml.safe_load(f)
resources = cls.load_from_dict(resources_dict=resources_dict)
return resources
[docs]
@classmethod
def load_from_dict(cls, resources_dict):
# check dict
base = cls.arginfo(detail_kwargs="batch_type" in resources_dict)
resources_dict = base.normalize_value(resources_dict, trim_pattern="_*")
base.check_value(resources_dict, strict=False)
return cls.deserialize(resources_dict=resources_dict)
[docs]
@staticmethod
def arginfo(detail_kwargs=True):
doc_number_node = "The number of node need for each `job`"
doc_cpu_per_node = "cpu numbers of each node assigned to each job."
doc_gpu_per_node = "gpu numbers of each node assigned to each job."
doc_queue_name = "The queue name of batch job scheduler system."
doc_group_size = "The number of `tasks` in a `job`. 0 means infinity."
doc_custom_flags = "The extra lines pass to job submitting script header"
doc_para_deg = "Decide how many tasks will be run in parallel."
doc_source_list = "The env file to be sourced before the command execution."
doc_module_purge = (
"Remove all modules on HPC system before module load (module_list)"
)
doc_module_unload_list = (
"The modules to be unloaded on HPC system before submitting jobs"
)
doc_module_list = (
"The modules to be loaded on HPC system before submitting jobs"
)
doc_envs = "The environment variables to be exported on before submitting jobs"
doc_prepend_script = "Optional script run before jobs submitted."
doc_append_script = "Optional script run after jobs submitted."
doc_wait_time = "The waitting time in second after a single `task` submitted"
doc_if_cuda_multi_devices = (
"If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS."
"If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task."
"Usually, this option will be used with Task.task_need_resources variable simultaneously."
)
doc_ratio_unfinished = "The ratio of `tasks` that can be unfinished."
doc_customized_script_header_template_file = (
"The customized template file to generate job submitting script header, "
"which overrides the default file."
)
strategy_args = [
Argument(
"if_cuda_multi_devices",
bool,
optional=True,
default=False,
doc=doc_if_cuda_multi_devices,
),
Argument(
"ratio_unfinished",
float,
optional=True,
default=0.0,
doc=doc_ratio_unfinished,
),
Argument(
"customized_script_header_template_file",
str,
optional=True,
doc=doc_customized_script_header_template_file,
),
]
doc_strategy = "strategies we use to generation job submitting scripts."
strategy_format = Argument(
"strategy", dict, strategy_args, optional=True, doc=doc_strategy
)
resources_args = [
Argument("number_node", int, optional=True, doc=doc_number_node, default=1),
Argument(
"cpu_per_node", int, optional=True, doc=doc_cpu_per_node, default=1
),
Argument(
"gpu_per_node", int, optional=True, doc=doc_gpu_per_node, default=0
),
Argument("queue_name", str, optional=True, doc=doc_queue_name, default=""),
Argument("group_size", int, optional=False, doc=doc_group_size),
Argument("custom_flags", List[str], optional=True, doc=doc_custom_flags),
# Argument("strategy", dict, optional=True, doc=doc_strategy,default=default_strategy),
strategy_format,
Argument("para_deg", int, optional=True, doc=doc_para_deg, default=1),
Argument(
"source_list", List[str], optional=True, doc=doc_source_list, default=[]
),
Argument(
"module_purge", bool, optional=True, doc=doc_module_purge, default=False
),
Argument(
"module_unload_list",
List[str],
optional=True,
doc=doc_module_unload_list,
default=[],
),
Argument(
"module_list", List[str], optional=True, doc=doc_module_list, default=[]
),
Argument("envs", dict, optional=True, doc=doc_envs, default={}),
Argument(
"prepend_script",
List[str],
optional=True,
doc=doc_prepend_script,
default=[],
),
Argument(
"append_script",
List[str],
optional=True,
doc=doc_append_script,
default=[],
),
Argument(
"wait_time", [int, float], optional=True, doc=doc_wait_time, default=0
),
]
if detail_kwargs:
batch_variant = Variant(
"batch_type",
[
machine.resources_arginfo()
for machine in set(Machine.subclasses_dict.values())
],
optional=False,
doc="The batch job system type loaded from machine/batch_type.",
)
resources_format = Argument(
"resources", dict, resources_args, [batch_variant]
)
else:
resources_args.append(
Argument(
"kwargs", dict, optional=True, doc="Vary by different machines."
)
)
resources_args.append(
Argument(
"batch_type",
str,
optional=True,
doc="Allow this key when strict checking.",
)
)
resources_format = Argument("resources", dict, resources_args)
return resources_format
# %%