dpdispatcher package#
- class dpdispatcher.Job(job_task_list, *, resources, machine=None)[source]#
Bases:
object
Job is generated by Submission automatically. A job ususally has many tasks and it may request computing resources from job scheduler systems. Each Job can generate a script file to be submitted to the job scheduler system or executed locally.
- Parameters:
- job_task_listlist of Task
the tasks belonging to the job
- resourcesResources
the machine resources. Passed from Submission when it constructs jobs.
- machinemachine
machine object to execute the job. Passed from Submission when it constructs jobs.
Methods
deserialize
(job_dict[, machine])Convert the job_dict to a Submission class object.
Get the jobs.
Get last error message when the job is terminated.
serialize
([if_static])Convert the Task class instance to a dictionary.
get_hash
handle_unexpected_job_state
job_to_json
register_job_id
submit_job
- classmethod deserialize(job_dict, machine=None)[source]#
Convert the job_dict to a Submission class object.
- Parameters:
- job_dictdict
the dictionary which contains the job information
- machineMachine
the machine object to execute the job
- Returns:
- submissionJob
the Job class instance converted from the job_dict
- get_job_state()[source]#
Get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.
Notes
this method will not submit or resubmit the jobs if the job is unsubmitted.
- class dpdispatcher.Machine(*args, **kwargs)[source]#
Bases:
object
A machine is used to handle the connection with remote machines.
- Parameters:
- contextSubClass derived from BaseContext
The context is used to mainatin the connection with remote machine.
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
Generate the resources arginfo.
Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
- kill(job)[source]#
Kill the job.
If not implemented, pass and let the user manually kill it.
- Parameters:
- jobJob
job
- options = {'Bohrium', 'DistributedShell', 'Fugaku', 'JH_UniScheduler', 'LSF', 'OpenAPI', 'PBS', 'SGE', 'Shell', 'Slurm', 'SlurmJobArray', 'Torque'}#
- classmethod resources_arginfo() Argument [source]#
Generate the resources arginfo.
- Returns:
- Argument
resources arginfo
- classmethod resources_subfields() List[Argument] [source]#
Generate the resources subfields.
- Returns:
- list[Argument]
resources subfields
- subclasses_dict = {'Bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'DistributedShell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'DpCloudServer': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'Fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'JH_UniScheduler': <class 'dpdispatcher.machines.JH_UniScheduler.JH_UniScheduler'>, 'LSF': <class 'dpdispatcher.machines.lsf.LSF'>, 'Lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'OpenAPI': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'PBS': <class 'dpdispatcher.machines.pbs.PBS'>, 'SGE': <class 'dpdispatcher.machines.pbs.SGE'>, 'Shell': <class 'dpdispatcher.machines.shell.Shell'>, 'Slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'SlurmJobArray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'Torque': <class 'dpdispatcher.machines.pbs.Torque'>, 'bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'distributedshell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'dpcloudserver': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'jh_unischeduler': <class 'dpdispatcher.machines.JH_UniScheduler.JH_UniScheduler'>, 'lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'lsf': <class 'dpdispatcher.machines.lsf.LSF'>, 'openapi': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'pbs': <class 'dpdispatcher.machines.pbs.PBS'>, 'sge': <class 'dpdispatcher.machines.pbs.SGE'>, 'shell': <class 'dpdispatcher.machines.shell.Shell'>, 'slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'slurmjobarray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'torque': <class 'dpdispatcher.machines.pbs.Torque'>}#
- class dpdispatcher.Resources(number_node, cpu_per_node, gpu_per_node, queue_name, group_size, *, custom_flags=[], strategy={'if_cuda_multi_devices': False, 'ratio_unfinished': 0.0}, para_deg=1, module_unload_list=[], module_purge=False, module_list=[], source_list=[], envs={}, prepend_script=[], append_script=[], wait_time=0, **kwargs)[source]#
Bases:
object
Resources is used to describe the machine resources we need to do calculations.
- Parameters:
- number_nodeint
The number of node need for each job.
- cpu_per_nodeint
cpu numbers of each node.
- gpu_per_nodeint
gpu numbers of each node.
- queue_namestr
The queue name of batch job scheduler system.
- group_sizeint
The number of tasks in a job.
- custom_flagslist of Str
The extra lines pass to job submitting script header
- strategydict
strategies we use to generation job submitting scripts. if_cuda_multi_devices : bool
If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS. If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task. Usually, this option will be used with Task.task_need_resources variable simultaneously.
- ratio_unfinishedfloat
The ratio of task that can be unfinished.
- customized_script_header_template_filestr
The customized template file to generate job submitting script header, which overrides the default file.
- para_degint
Decide how many tasks will be run in parallel. Usually run with strategy[‘if_cuda_multi_devices’]
- source_listlist of Path
The env file to be sourced before the command execution.
- wait_timeint
The waitting time in second after a single task submitted. Default: 0.
Methods
arginfo
deserialize
load_from_dict
load_from_json
load_from_yaml
serialize
- class dpdispatcher.Submission(work_base, machine=None, resources=None, forward_common_files=[], backward_common_files=[], *, task_list=[])[source]#
Bases:
object
A submission represents a collection of tasks. These tasks usually locate at a common directory. And these Tasks may share common files to be uploaded and downloaded.
- Parameters:
- work_basePath
the base directory of the local tasks. It is usually the dir name of project .
- machineMachine
machine class object (for example, PBS, Slurm, Shell) to execute the jobs. The machine can still be bound after the instantiation with the bind_submission method.
- resourcesResources
the machine resources (cpu or gpu) used to generate the slurm/pbs script
- forward_common_fileslist
the common files to be uploaded to other computers before the jobs begin
- backward_common_fileslist
the common files to be downloaded from other computers after the jobs finish
- task_listlist of Task
a list of tasks to be run.
Methods
async_run_submission
(**kwargs)Async interface of run_submission.
bind_machine
(machine)Bind this submission to a machine.
Check whether all the jobs in the submission.
check_ratio_unfinished
(ratio_unfinished)Calculate the ratio of unfinished tasks in the submission.
deserialize
(submission_dict[, machine])Convert the submission_dict to a Submission class object.
After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs.
Handle unexpected job state of the submission.
run_submission
(*[, dry_run, exit_on_submit, ...])Main method to execute the submission.
serialize
([if_static])Convert the Submission class instance to a dictionary.
Check whether all the jobs in the submission.
clean_jobs
download_jobs
get_hash
register_task
register_task_list
remove_unfinished_tasks
submission_from_json
submission_to_json
try_download_result
try_recover_from_json
upload_jobs
- async async_run_submission(**kwargs)[source]#
Async interface of run_submission.
Examples
>>> import asyncio >>> from dpdispacher import Machine, Resource, Submission >>> async def run_jobs(): ... backgroud_task = set() ... # task1 ... task1 = Task(...) ... submission1 = Submission(..., task_list=[task1]) ... background_task = asyncio.create_task( ... submission1.async_run_submission(check_interval=2, clean=False) ... ) ... # task2 ... task2 = Task(...) ... submission2 = Submission(..., task_list=[task1]) ... background_task = asyncio.create_task( ... submission2.async_run_submission(check_interval=2, clean=False) ... ) ... background_tasks.add(background_task) ... result = await asyncio.gather(*background_tasks) ... return result >>> run_jobs()
May raise Error if pass clean=True explicitly when submit to pbs or slurm.
- bind_machine(machine)[source]#
Bind this submission to a machine. update the machine’s context remote_root and local_root.
- Parameters:
- machineMachine
the machine to bind with
- check_all_finished()[source]#
Check whether all the jobs in the submission.
Notes
This method will not handle unexpected job state in the submission.
- check_ratio_unfinished(ratio_unfinished: float) bool [source]#
Calculate the ratio of unfinished tasks in the submission.
- Parameters:
- ratio_unfinishedfloat
the ratio of unfinished tasks in the submission
- Returns:
- bool
whether the ratio of unfinished tasks in the submission is larger than ratio_unfinished
- classmethod deserialize(submission_dict, machine=None)[source]#
Convert the submission_dict to a Submission class object.
- Parameters:
- submission_dictdict
path-like, the base directory of the local tasks
- machineMachine
Machine class Object to execute the jobs
- Returns:
- submissionSubmission
the Submission class instance converted from the submission_dict
- generate_jobs()[source]#
After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs. The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task. Why we randomly shuffle the tasks is under the consideration of load balance. The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.
- handle_unexpected_submission_state()[source]#
Handle unexpected job state of the submission. If the job state is unsubmitted, submit the job. If the job state is terminated (killed unexpectly), resubmit the job. If the job state is unknown, raise an error.
- run_submission(*, dry_run=False, exit_on_submit=False, clean=True, check_interval=30)[source]#
Main method to execute the submission. First, check whether old Submission exists on the remote machine, and try to recover from it. Second, upload the local files to the remote machine where the tasks to be executed. Third, run the submission defined previously. Forth, wait until the tasks in the submission finished and download the result file to local directory. If dry_run is True, submission will be uploaded but not be executed and exit. If exit_on_submit is True, submission will exit.
- serialize(if_static=False)[source]#
Convert the Submission class instance to a dictionary.
- Parameters:
- if_staticbool
whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.
- Returns:
- submission_dictdict
the dictionary converted from the Submission class instance
- class dpdispatcher.Task(command, task_work_path, forward_files=[], backward_files=[], outlog='log', errlog='err')[source]#
Bases:
object
A task is a sequential command to be executed, as well as the files it depends on to transmit forward and backward.
- Parameters:
- commandStr
the command to be executed.
- task_work_pathPath
the directory of each file where the files are dependent on.
- forward_fileslist of Path
the files to be transmitted to remote machine before the command execute.
- backward_fileslist of Path
the files to be transmitted from remote machine after the comand finished.
- outlogStr
the filename to which command redirect stdout
- errlogStr
the filename to which command redirect stderr
Methods
deserialize
(task_dict)Convert the task_dict to a Task class object.
get_task_state
(context)Get the task state by checking the tag file.
arginfo
get_hash
load_from_dict
load_from_json
load_from_yaml
serialize
- classmethod deserialize(task_dict)[source]#
Convert the task_dict to a Task class object.
- Parameters:
- task_dictdict
the dictionary which contains the task information
- Returns:
- taskTask
the Task class instance converted from the task_dict
Subpackages#
- dpdispatcher.contexts package
- Submodules
- dpdispatcher.contexts.dp_cloud_server_context module
BohriumContext
BohriumContext.alias
BohriumContext.bind_submission()
BohriumContext.block_call()
BohriumContext.check_file_exists()
BohriumContext.check_home_file_exits()
BohriumContext.clean()
BohriumContext.download()
BohriumContext.load_from_dict()
BohriumContext.machine_subfields()
BohriumContext.read_file()
BohriumContext.read_home_file()
BohriumContext.upload()
BohriumContext.upload_job()
BohriumContext.write_file()
BohriumContext.write_home_file()
BohriumContext.write_local_file()
DpCloudServerContext
LebesgueContext
- dpdispatcher.contexts.hdfs_context module
- dpdispatcher.contexts.lazy_local_context module
LazyLocalContext
LazyLocalContext.bind_submission()
LazyLocalContext.block_call()
LazyLocalContext.call()
LazyLocalContext.check_file_exists()
LazyLocalContext.check_finish()
LazyLocalContext.clean()
LazyLocalContext.download()
LazyLocalContext.get_job_root()
LazyLocalContext.get_return()
LazyLocalContext.load_from_dict()
LazyLocalContext.read_file()
LazyLocalContext.upload()
LazyLocalContext.write_file()
SPRetObj
- dpdispatcher.contexts.local_context module
LocalContext
LocalContext.bind_submission()
LocalContext.block_call()
LocalContext.call()
LocalContext.check_file_exists()
LocalContext.check_finish()
LocalContext.clean()
LocalContext.download()
LocalContext.get_job_root()
LocalContext.get_return()
LocalContext.load_from_dict()
LocalContext.machine_subfields()
LocalContext.read_file()
LocalContext.upload()
LocalContext.write_file()
SPRetObj
- dpdispatcher.contexts.openapi_context module
OpenAPIContext
OpenAPIContext.bind_submission()
OpenAPIContext.block_call()
OpenAPIContext.check_file_exists()
OpenAPIContext.check_home_file_exits()
OpenAPIContext.clean()
OpenAPIContext.download()
OpenAPIContext.load_from_dict()
OpenAPIContext.read_file()
OpenAPIContext.read_home_file()
OpenAPIContext.upload()
OpenAPIContext.upload_job()
OpenAPIContext.write_file()
OpenAPIContext.write_home_file()
OpenAPIContext.write_local_file()
- dpdispatcher.contexts.ssh_context module
SSHContext
SSHContext.bind_submission()
SSHContext.block_call()
SSHContext.call()
SSHContext.check_file_exists()
SSHContext.check_finish()
SSHContext.clean()
SSHContext.close()
SSHContext.download()
SSHContext.get_job_root()
SSHContext.get_return()
SSHContext.list_remote_dir()
SSHContext.load_from_dict()
SSHContext.machine_subfields()
SSHContext.read_file()
SSHContext.sftp
SSHContext.ssh
SSHContext.upload()
SSHContext.write_file()
SSHSession
- dpdispatcher.dpcloudserver package
- dpdispatcher.entrypoints package
- dpdispatcher.machines package
- Submodules
- dpdispatcher.machines.JH_UniScheduler module
- dpdispatcher.machines.distributed_shell module
- dpdispatcher.machines.dp_cloud_server module
- dpdispatcher.machines.fugaku module
- dpdispatcher.machines.lsf module
- dpdispatcher.machines.openapi module
- dpdispatcher.machines.pbs module
- dpdispatcher.machines.shell module
- dpdispatcher.machines.slurm module
- dpdispatcher.utils package
Submodules#
dpdispatcher.arginfo module#
dpdispatcher.base_context module#
- class dpdispatcher.base_context.BaseContext(*args, **kwargs)[source]#
Bases:
object
Methods
block_call
(cmd)Run command with arguments.
block_checkcall
(cmd[, asynchronously])Run command with arguments.
Generate the machine arginfo.
Generate the machine subfields.
bind_submission
check_finish
clean
download
load_from_dict
read_file
upload
write_file
- abstract block_call(cmd) Tuple[int, Any, Any, Any] [source]#
Run command with arguments. Wait for command to complete.
- Parameters:
- cmdstr
The command to run.
- Returns:
- exit_status
exit code
- stdin
standard inout
- stdout
standard output
- stderr
standard error
- block_checkcall(cmd, asynchronously=False) Tuple[Any, Any, Any] [source]#
Run command with arguments. Wait for command to complete.
- Parameters:
- cmdstr
The command to run.
- asynchronouslybool, optional, default=False
Run command asynchronously. If True, nohup will be used to run the command.
- Returns:
- stdin
standard inout
- stdout
standard output
- stderr
standard error
- Raises:
- RuntimeError
when the return code is not zero
- classmethod machine_arginfo() Argument [source]#
Generate the machine arginfo.
- Returns:
- Argument
machine arginfo
- classmethod machine_subfields() List[Argument] [source]#
Generate the machine subfields.
- Returns:
- list[Argument]
machine subfields
- options = {'BohriumContext', 'HDFSContext', 'LazyLocalContext', 'LocalContext', 'OpenAPIContext', 'SSHContext'}#
- subclasses_dict = {'Bohrium': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'BohriumContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'DpCloudServer': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'DpCloudServerContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'HDFS': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'HDFSContext': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'LazyLocal': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'LazyLocalContext': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'Lebesgue': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'LebesgueContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'Local': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'LocalContext': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'OpenAPI': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'OpenAPIContext': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'SSH': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'SSHContext': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'bohrium': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'bohriumcontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'dpcloudserver': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'dpcloudservercontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'hdfs': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'hdfscontext': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'lazylocal': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'lazylocalcontext': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'lebesgue': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'lebesguecontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'local': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'localcontext': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'openapi': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'openapicontext': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'ssh': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'sshcontext': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>}#
dpdispatcher.dlog module#
dpdispatcher.dpdisp module#
- dpdispatcher.dpdisp.main_parser() ArgumentParser [source]#
Dpdispatcher commandline options argument parser.
- Returns:
- argparse.ArgumentParser
the argument parser
Notes
This function is used by documentation.
dpdispatcher.machine module#
- class dpdispatcher.machine.Machine(*args, **kwargs)[source]#
Bases:
object
A machine is used to handle the connection with remote machines.
- Parameters:
- contextSubClass derived from BaseContext
The context is used to mainatin the connection with remote machine.
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
Generate the resources arginfo.
Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
- kill(job)[source]#
Kill the job.
If not implemented, pass and let the user manually kill it.
- Parameters:
- jobJob
job
- options = {'Bohrium', 'DistributedShell', 'Fugaku', 'JH_UniScheduler', 'LSF', 'OpenAPI', 'PBS', 'SGE', 'Shell', 'Slurm', 'SlurmJobArray', 'Torque'}#
- classmethod resources_arginfo() Argument [source]#
Generate the resources arginfo.
- Returns:
- Argument
resources arginfo
- classmethod resources_subfields() List[Argument] [source]#
Generate the resources subfields.
- Returns:
- list[Argument]
resources subfields
- subclasses_dict = {'Bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'DistributedShell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'DpCloudServer': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'Fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'JH_UniScheduler': <class 'dpdispatcher.machines.JH_UniScheduler.JH_UniScheduler'>, 'LSF': <class 'dpdispatcher.machines.lsf.LSF'>, 'Lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'OpenAPI': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'PBS': <class 'dpdispatcher.machines.pbs.PBS'>, 'SGE': <class 'dpdispatcher.machines.pbs.SGE'>, 'Shell': <class 'dpdispatcher.machines.shell.Shell'>, 'Slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'SlurmJobArray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'Torque': <class 'dpdispatcher.machines.pbs.Torque'>, 'bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'distributedshell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'dpcloudserver': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'jh_unischeduler': <class 'dpdispatcher.machines.JH_UniScheduler.JH_UniScheduler'>, 'lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'lsf': <class 'dpdispatcher.machines.lsf.LSF'>, 'openapi': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'pbs': <class 'dpdispatcher.machines.pbs.PBS'>, 'sge': <class 'dpdispatcher.machines.pbs.SGE'>, 'shell': <class 'dpdispatcher.machines.shell.Shell'>, 'slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'slurmjobarray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'torque': <class 'dpdispatcher.machines.pbs.Torque'>}#
dpdispatcher.run module#
- dpdispatcher.run.create_submission(metadata: dict, hash: str) Submission [source]#
Create a Submission instance from a PEP 723 metadata.
- Parameters:
- metadatadict
PEP 723 metadata.
- hashstr
Submission hash.
- Returns:
- Submission
Submission instance.
dpdispatcher.submission module#
- class dpdispatcher.submission.Job(job_task_list, *, resources, machine=None)[source]#
Bases:
object
Job is generated by Submission automatically. A job ususally has many tasks and it may request computing resources from job scheduler systems. Each Job can generate a script file to be submitted to the job scheduler system or executed locally.
- Parameters:
- job_task_listlist of Task
the tasks belonging to the job
- resourcesResources
the machine resources. Passed from Submission when it constructs jobs.
- machinemachine
machine object to execute the job. Passed from Submission when it constructs jobs.
Methods
deserialize
(job_dict[, machine])Convert the job_dict to a Submission class object.
Get the jobs.
Get last error message when the job is terminated.
serialize
([if_static])Convert the Task class instance to a dictionary.
get_hash
handle_unexpected_job_state
job_to_json
register_job_id
submit_job
- classmethod deserialize(job_dict, machine=None)[source]#
Convert the job_dict to a Submission class object.
- Parameters:
- job_dictdict
the dictionary which contains the job information
- machineMachine
the machine object to execute the job
- Returns:
- submissionJob
the Job class instance converted from the job_dict
- get_job_state()[source]#
Get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.
Notes
this method will not submit or resubmit the jobs if the job is unsubmitted.
- class dpdispatcher.submission.Resources(number_node, cpu_per_node, gpu_per_node, queue_name, group_size, *, custom_flags=[], strategy={'if_cuda_multi_devices': False, 'ratio_unfinished': 0.0}, para_deg=1, module_unload_list=[], module_purge=False, module_list=[], source_list=[], envs={}, prepend_script=[], append_script=[], wait_time=0, **kwargs)[source]#
Bases:
object
Resources is used to describe the machine resources we need to do calculations.
- Parameters:
- number_nodeint
The number of node need for each job.
- cpu_per_nodeint
cpu numbers of each node.
- gpu_per_nodeint
gpu numbers of each node.
- queue_namestr
The queue name of batch job scheduler system.
- group_sizeint
The number of tasks in a job.
- custom_flagslist of Str
The extra lines pass to job submitting script header
- strategydict
strategies we use to generation job submitting scripts. if_cuda_multi_devices : bool
If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS. If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task. Usually, this option will be used with Task.task_need_resources variable simultaneously.
- ratio_unfinishedfloat
The ratio of task that can be unfinished.
- customized_script_header_template_filestr
The customized template file to generate job submitting script header, which overrides the default file.
- para_degint
Decide how many tasks will be run in parallel. Usually run with strategy[‘if_cuda_multi_devices’]
- source_listlist of Path
The env file to be sourced before the command execution.
- wait_timeint
The waitting time in second after a single task submitted. Default: 0.
Methods
arginfo
deserialize
load_from_dict
load_from_json
load_from_yaml
serialize
- class dpdispatcher.submission.Submission(work_base, machine=None, resources=None, forward_common_files=[], backward_common_files=[], *, task_list=[])[source]#
Bases:
object
A submission represents a collection of tasks. These tasks usually locate at a common directory. And these Tasks may share common files to be uploaded and downloaded.
- Parameters:
- work_basePath
the base directory of the local tasks. It is usually the dir name of project .
- machineMachine
machine class object (for example, PBS, Slurm, Shell) to execute the jobs. The machine can still be bound after the instantiation with the bind_submission method.
- resourcesResources
the machine resources (cpu or gpu) used to generate the slurm/pbs script
- forward_common_fileslist
the common files to be uploaded to other computers before the jobs begin
- backward_common_fileslist
the common files to be downloaded from other computers after the jobs finish
- task_listlist of Task
a list of tasks to be run.
Methods
async_run_submission
(**kwargs)Async interface of run_submission.
bind_machine
(machine)Bind this submission to a machine.
Check whether all the jobs in the submission.
check_ratio_unfinished
(ratio_unfinished)Calculate the ratio of unfinished tasks in the submission.
deserialize
(submission_dict[, machine])Convert the submission_dict to a Submission class object.
After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs.
Handle unexpected job state of the submission.
run_submission
(*[, dry_run, exit_on_submit, ...])Main method to execute the submission.
serialize
([if_static])Convert the Submission class instance to a dictionary.
Check whether all the jobs in the submission.
clean_jobs
download_jobs
get_hash
register_task
register_task_list
remove_unfinished_tasks
submission_from_json
submission_to_json
try_download_result
try_recover_from_json
upload_jobs
- async async_run_submission(**kwargs)[source]#
Async interface of run_submission.
Examples
>>> import asyncio >>> from dpdispacher import Machine, Resource, Submission >>> async def run_jobs(): ... backgroud_task = set() ... # task1 ... task1 = Task(...) ... submission1 = Submission(..., task_list=[task1]) ... background_task = asyncio.create_task( ... submission1.async_run_submission(check_interval=2, clean=False) ... ) ... # task2 ... task2 = Task(...) ... submission2 = Submission(..., task_list=[task1]) ... background_task = asyncio.create_task( ... submission2.async_run_submission(check_interval=2, clean=False) ... ) ... background_tasks.add(background_task) ... result = await asyncio.gather(*background_tasks) ... return result >>> run_jobs()
May raise Error if pass clean=True explicitly when submit to pbs or slurm.
- bind_machine(machine)[source]#
Bind this submission to a machine. update the machine’s context remote_root and local_root.
- Parameters:
- machineMachine
the machine to bind with
- check_all_finished()[source]#
Check whether all the jobs in the submission.
Notes
This method will not handle unexpected job state in the submission.
- check_ratio_unfinished(ratio_unfinished: float) bool [source]#
Calculate the ratio of unfinished tasks in the submission.
- Parameters:
- ratio_unfinishedfloat
the ratio of unfinished tasks in the submission
- Returns:
- bool
whether the ratio of unfinished tasks in the submission is larger than ratio_unfinished
- classmethod deserialize(submission_dict, machine=None)[source]#
Convert the submission_dict to a Submission class object.
- Parameters:
- submission_dictdict
path-like, the base directory of the local tasks
- machineMachine
Machine class Object to execute the jobs
- Returns:
- submissionSubmission
the Submission class instance converted from the submission_dict
- generate_jobs()[source]#
After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs. The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task. Why we randomly shuffle the tasks is under the consideration of load balance. The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.
- handle_unexpected_submission_state()[source]#
Handle unexpected job state of the submission. If the job state is unsubmitted, submit the job. If the job state is terminated (killed unexpectly), resubmit the job. If the job state is unknown, raise an error.
- run_submission(*, dry_run=False, exit_on_submit=False, clean=True, check_interval=30)[source]#
Main method to execute the submission. First, check whether old Submission exists on the remote machine, and try to recover from it. Second, upload the local files to the remote machine where the tasks to be executed. Third, run the submission defined previously. Forth, wait until the tasks in the submission finished and download the result file to local directory. If dry_run is True, submission will be uploaded but not be executed and exit. If exit_on_submit is True, submission will exit.
- serialize(if_static=False)[source]#
Convert the Submission class instance to a dictionary.
- Parameters:
- if_staticbool
whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.
- Returns:
- submission_dictdict
the dictionary converted from the Submission class instance
- class dpdispatcher.submission.Task(command, task_work_path, forward_files=[], backward_files=[], outlog='log', errlog='err')[source]#
Bases:
object
A task is a sequential command to be executed, as well as the files it depends on to transmit forward and backward.
- Parameters:
- commandStr
the command to be executed.
- task_work_pathPath
the directory of each file where the files are dependent on.
- forward_fileslist of Path
the files to be transmitted to remote machine before the command execute.
- backward_fileslist of Path
the files to be transmitted from remote machine after the comand finished.
- outlogStr
the filename to which command redirect stdout
- errlogStr
the filename to which command redirect stderr
Methods
deserialize
(task_dict)Convert the task_dict to a Task class object.
get_task_state
(context)Get the task state by checking the tag file.
arginfo
get_hash
load_from_dict
load_from_json
load_from_yaml
serialize
- classmethod deserialize(task_dict)[source]#
Convert the task_dict to a Task class object.
- Parameters:
- task_dictdict
the dictionary which contains the task information
- Returns:
- taskTask
the Task class instance converted from the task_dict