DPDispatcher’s documentation
DPDispatcher is a Python package used to generate HPC (High Performance Computing) scheduler systems (Slurm/PBS/LSF/dpcloudserver) jobs input scripts and submit these scripts to HPC systems and poke until they finish.
DPDispatcher will monitor (poke) until these jobs finish and download the results files (if these jobs is running on remote systems connected by SSH).
Install DPDispatcher
DPDispatcher can installed by pip
:
pip install dpdispatcher
To add Bohrium support, execute
pip install dpdispatcher[bohrium]
Getting Started
DPDispatcher provides the following classes:
Task
class, which represents a command to be run on batch job system, as well as the essential files need by the command.Submission
class, which represents a collection of jobs defined by the HPC system. And there may be common files to be uploaded by them. DPDispatcher will create and submit these jobs when asubmission
instance executerun_submission
method. This method will poke until the jobs finish and return.Job
class, a class used bySubmission
class, which represents a job on the HPC system.Submission
will generatejob
s’ submitting scripts used by HPC systems automatically with theTask
andResources
Resources
class, which represents the computing resources for each job within asubmission
.
You can use DPDispatcher in a Python script to submit five tasks:
from dpdispatcher import Machine, Resources, Task, Submission
machine = Machine.load_from_json("machine.json")
resources = Resources.load_from_json("resources.json")
task0 = Task.load_from_json("task.json")
task1 = Task(
command="cat example.txt",
task_work_path="dir1/",
forward_files=["example.txt"],
backward_files=["out.txt"],
outlog="out.txt",
)
task2 = Task(
command="cat example.txt",
task_work_path="dir2/",
forward_files=["example.txt"],
backward_files=["out.txt"],
outlog="out.txt",
)
task3 = Task(
command="cat example.txt",
task_work_path="dir3/",
forward_files=["example.txt"],
backward_files=["out.txt"],
outlog="out.txt",
)
task4 = Task(
command="cat example.txt",
task_work_path="dir4/",
forward_files=["example.txt"],
backward_files=["out.txt"],
outlog="out.txt",
)
task_list = [task0, task1, task2, task3, task4]
submission = Submission(
work_base="lammps_md_300K_5GPa/",
machine=machine,
resources=resources,
task_list=task_list,
forward_common_files=["graph.pb"],
backward_common_files=[],
)
submission.run_submission()
where machine.json
is
{
"batch_type": "Slurm",
"context_type": "SSHContext",
"local_root": "/home/user123/workplace/22_new_project/",
"remote_root": "/home/user123/dpdispatcher_work_dir/",
"remote_profile": {
"hostname": "39.106.xx.xxx",
"username": "user123",
"port": 22,
"timeout": 10
}
}
resources.json
is
{
"number_node": 1,
"cpu_per_node": 4,
"gpu_per_node": 1,
"queue_name": "GPUV100",
"group_size": 5
}
and task.json
is
{
"command": "lmp -i input.lammps",
"task_work_path": "bct-0/",
"forward_files": ["conf.lmp", "input.lammps"],
"backward_files": ["log.lammps"],
"outlog": "log",
"errlog": "err"
}
You may also submit mutiple GPU jobs: complex resources example
resources = Resources(
number_node=1,
cpu_per_node=4,
gpu_per_node=2,
queue_name="GPU_2080Ti",
group_size=4,
custom_flags=["#SBATCH --nice=100", "#SBATCH --time=24:00:00"],
strategy={
# used when you want to add CUDA_VISIBLE_DIVECES automatically
"if_cuda_multi_devices": True
},
para_deg=1,
# will unload these modules before running tasks
module_unload_list=["singularity"],
# will load these modules before running tasks
module_list=["singularity/3.0.0"],
# will source the environment files before running tasks
source_list=["./slurm_test.env"],
# the envs option is used to export environment variables
# And it will generate a line like below.
# export DP_DISPATCHER_EXPORT=test_foo_bar_baz
envs={"DP_DISPATCHER_EXPORT": "test_foo_bar_baz"},
)
The details of parameters can be found in Machine Parameters, Resources Parameters, and Task Parameters.
Supported contexts
Context is the way to connect to the remote server. One needs to set context_type
to one of the following values:
LazyLocal
context_type
: LazyLocal
LazyLocal
directly runs jobs in the local server and local directory.
Since bash -l
is used in the shebang line of the submission scripts, the login shell startup files will be executed, potentially overriding the current environmental variables. Therefore, it’s advisable to explicitly set the environmental variables using envs
or source_list
.
Local
context_type
: Local
Local
runs jobs in the local server, but in a different directory. Files will be copied to the remote directory before jobs start and copied back after jobs finish.
Since bash -l
is used in the shebang line of the submission scripts, the login shell startup files will be executed, potentially overriding the current environmental variables. Therefore, it’s advisable to explicitly set the environmental variables using envs
or source_list
.
SSH
context_type
: SSH
SSH
runs jobs in a remote server. Files will be copied to the remote directory via SSH channels before jobs start and copied back after jobs finish. To use SSH, one needs to provide necessary parameters in remote_profile
, such as username
and hostname
.
It’s suggested to generate SSH keys and transfer the public key to the remote server in advance, which is more secure than password authentication.
Note that SSH
context is non-login, so bash_profile
files will not be executed outside the submission script.
Bohrium
context_type
: Bohrium
Bohrium is the cloud platform for scientific computing. Read Bohrium documentation for details. To use Bohrium, one needs to provide necessary parameters in remote_profile
.
HDFS
context_type
: HDFS
The Hadoop Distributed File System (HDFS) is a distributed file system. Read Support DPDispatcher on Yarn for details.
OpenAPI
context_type
: OpenAPI
OpenAPI is a new way to submit jobs to Bohrium. It using AccessKey instead of username and password. Read Bohrium documentation for details. To use OpenAPI, one needs to provide necessary parameters in remote_profile
.
Supported batch job systems
Batch job system is a system to process batch jobs. One needs to set batch_type
to one of the following values:
Bash
batch_type
: Shell
When batch_type
is set to Shell
, dpdispatcher will generate a bash script to process jobs. No extra packages are required for Shell
.
Due to lack of scheduling system, Shell
runs all jobs at the same time. To avoid running multiple jobs at the same time, one could set group_size
to 0
(means infinity) to generate only one job with multiple tasks.
Slurm
batch_type
: Slurm
, SlurmJobArray
Slurm is a job scheduling system used by lots of HPCs. One needs to make sure slurm has been setup in the remote server and the related environment is activated.
When SlurmJobArray
is used, dpdispatcher submits Slurm jobs with job arrays. In this way, several dpdispatcher task
s map to a Slurm job and a dpdispatcher job
maps to a Slurm job array. Millions of Slurm jobs can be submitted quickly and Slurm can execute all Slurm jobs at the same time. One can use group_size
and slurm_job_size
to control how many Slurm jobs are contained in a Slurm job array.
OpenPBS or PBSPro
batch_type
: PBS
OpenPBS is an open-source job scheduling of the Linux Foundation and PBS Profession is its commercial solution. One needs to make sure OpenPBS has been setup in the remote server and the related environment is activated.
Note that do not use PBS
for Torque.
TORQUE
batch_type
: Torque
The Terascale Open-source Resource and QUEue Manager (TORQUE) is a distributed resource manager based on standard OpenPBS. However, not all OpenPBS flags are still supported in TORQUE. One needs to make sure TORQUE has been setup in the remote server and the related environment is activated.
LSF
batch_type
: LSF
IBM Spectrum LSF Suites is a comprehensive workload management solution used by HPCs. One needs to make sure LSF has been setup in the remote server and the related environment is activated.
Bohrium
batch_type
: Bohrium
Bohrium is the cloud platform for scientific computing. Read Bohrium documentation for details.
DistributedShell
batch_type
: DistributedShell
DistributedShell
is used to submit yarn jobs. Read Support DPDispatcher on Yarn for details.
Fugaku
batch_type
: Fugaku
Fujitsu cloud service is a job scheduling system used by Fujitsu’s HPCs such as Fugaku, ITO and K computer. It should be noted that although the same job scheduling system is used, there are some differences in the details, Fagaku class cannot be directly used for other HPCs.
Read Fujitsu cloud service documentation for details.
OpenAPI
batcy_type
: OpenAPI
OpenAPI is a new way to submit jobs to Bohrium. It using AccessKey instead of username and password. Read Bohrium documentation for details.
SGE
batch_type
: SGE
The Sun Grid Engine (SGE) scheduler is a batch-queueing system distributed resource management. The commands and flags of SGE share a lot similarity with PBS except when checking job status. Use this argument if one is submitting job to SGE based batch system.
Machine parameters
Note
One can load, modify, and export the input file by using our effective web-based tool DP-GUI online or hosted using the command line interface dpdisp gui
. All parameters below can be set in DP-GUI. By clicking “SAVE JSON”, one can download the input file.
- machine:
- type:
dict
argument path:machine
- batch_type:
- type:
str
argument path:machine/batch_type
The batch job system type. Option: LSF, PBS, Torque, Slurm, Shell, Bohrium, Fugaku, OpenAPI, SGE, DistributedShell, SlurmJobArray
- local_root:
- type:
str
|NoneType
argument path:machine/local_root
The dir where the tasks and relating files locate. Typically the project dir.
- remote_root:
- type:
str
|NoneType
, optionalargument path:machine/remote_root
The dir where the tasks are executed on the remote machine. Only needed when context is not lazy-local.
- clean_asynchronously:
- type:
bool
, optional, default:False
argument path:machine/clean_asynchronously
Clean the remote directory asynchronously after the job finishes.
Depending on the value of context_type, different sub args are accepted.
- context_type:
- type:
str
(flag key)argument path:machine/context_type
possible choices:SSHContext
,LocalContext
,HDFSContext
,OpenAPIContext
,LazyLocalContext
,BohriumContext
The connection used to remote machine. Option: SSHContext, OpenAPIContext, HDFSContext, LazyLocalContext, LocalContext, BohriumContext
When context_type is set to
SSHContext
(or its aliasessshcontext
,SSH
,ssh
):- remote_profile:
- type:
dict
argument path:machine[SSHContext]/remote_profile
The information used to maintain the connection with remote machine.
- hostname:
- type:
str
argument path:machine[SSHContext]/remote_profile/hostname
hostname or ip of ssh connection.
- username:
- type:
str
argument path:machine[SSHContext]/remote_profile/username
username of target linux system
- password:
- type:
str
, optionalargument path:machine[SSHContext]/remote_profile/password
(deprecated) password of linux system. Please use SSH keys instead to improve security.
- port:
- type:
int
, optional, default:22
argument path:machine[SSHContext]/remote_profile/port
ssh connection port.
- key_filename:
- type:
str
|NoneType
, optional, default:None
argument path:machine[SSHContext]/remote_profile/key_filename
key filename used by ssh connection. If left None, find key in ~/.ssh or use password for login
- passphrase:
- type:
str
|NoneType
, optional, default:None
argument path:machine[SSHContext]/remote_profile/passphrase
passphrase of key used by ssh connection
- timeout:
- type:
int
, optional, default:10
argument path:machine[SSHContext]/remote_profile/timeout
timeout of ssh connection
- totp_secret:
- type:
str
|NoneType
, optional, default:None
argument path:machine[SSHContext]/remote_profile/totp_secret
Time-based one time password secret. It should be a base32-encoded string extracted from the 2D code.
- tar_compress:
- type:
bool
, optional, default:True
argument path:machine[SSHContext]/remote_profile/tar_compress
The archive will be compressed in upload and download if it is True. If not, compression will be skipped.
- look_for_keys:
- type:
bool
, optional, default:True
argument path:machine[SSHContext]/remote_profile/look_for_keys
enable searching for discoverable private key files in ~/.ssh/
When context_type is set to
LocalContext
(or its aliaseslocalcontext
,Local
,local
):- remote_profile:
- type:
dict
, optionalargument path:machine[LocalContext]/remote_profile
The information used to maintain the connection with remote machine. This field is empty for this context.
When context_type is set to
HDFSContext
(or its aliaseshdfscontext
,HDFS
,hdfs
):- remote_profile:
- type:
dict
, optionalargument path:machine[HDFSContext]/remote_profile
The information used to maintain the connection with remote machine. This field is empty for this context.
When context_type is set to
OpenAPIContext
(or its aliasesopenapicontext
,OpenAPI
,openapi
):- remote_profile:
- type:
dict
, optionalargument path:machine[OpenAPIContext]/remote_profile
The information used to maintain the connection with remote machine. This field is empty for this context.
When context_type is set to
LazyLocalContext
(or its aliaseslazylocalcontext
,LazyLocal
,lazylocal
):- remote_profile:
- type:
dict
, optionalargument path:machine[LazyLocalContext]/remote_profile
The information used to maintain the connection with remote machine. This field is empty for this context.
When context_type is set to
BohriumContext
(or its aliasesbohriumcontext
,Bohrium
,bohrium
,DpCloudServerContext
,dpcloudservercontext
,DpCloudServer
,dpcloudserver
,LebesgueContext
,lebesguecontext
,Lebesgue
,lebesgue
):- remote_profile:
- type:
dict
argument path:machine[BohriumContext]/remote_profile
The information used to maintain the connection with remote machine.
- email:
- type:
str
, optionalargument path:machine[BohriumContext]/remote_profile/email
Email
- password:
- type:
str
, optionalargument path:machine[BohriumContext]/remote_profile/password
Password
- program_id:
- type:
int
, alias: project_idargument path:machine[BohriumContext]/remote_profile/program_id
Program ID
- retry_count:
- type:
NoneType
|int
, optional, default:2
argument path:machine[BohriumContext]/remote_profile/retry_count
The retry count when a job is terminated
- ignore_exit_code:
- type:
bool
, optional, default:True
argument path:machine[BohriumContext]/remote_profile/ignore_exit_code
- The job state will be marked as finished if the exit code is non-zero when set to True. Otherwise,
the job state will be designated as terminated.
- keep_backup:
- type:
bool
, optionalargument path:machine[BohriumContext]/remote_profile/keep_backup
keep download and upload zip
- input_data:
- type:
dict
argument path:machine[BohriumContext]/remote_profile/input_data
Configuration of job
Resources parameters
Note
One can load, modify, and export the input file by using our effective web-based tool DP-GUI online or hosted using the command line interface dpdisp gui
. All parameters below can be set in DP-GUI. By clicking “SAVE JSON”, one can download the input file for.
- resources:
- type:
dict
argument path:resources
- number_node:
- type:
int
, optional, default:1
argument path:resources/number_node
The number of node need for each job
- cpu_per_node:
- type:
int
, optional, default:1
argument path:resources/cpu_per_node
cpu numbers of each node assigned to each job.
- gpu_per_node:
- type:
int
, optional, default:0
argument path:resources/gpu_per_node
gpu numbers of each node assigned to each job.
- queue_name:
- type:
str
, optional, default: (empty string)argument path:resources/queue_name
The queue name of batch job scheduler system.
- group_size:
- type:
int
argument path:resources/group_size
The number of tasks in a job. 0 means infinity.
- custom_flags:
- type:
typing.List[str]
, optionalargument path:resources/custom_flags
The extra lines pass to job submitting script header
- strategy:
- type:
dict
, optionalargument path:resources/strategy
strategies we use to generation job submitting scripts.
- if_cuda_multi_devices:
- type:
bool
, optional, default:False
argument path:resources/strategy/if_cuda_multi_devices
If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS.If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task.Usually, this option will be used with Task.task_need_resources variable simultaneously.
- ratio_unfinished:
- type:
float
, optional, default:0.0
argument path:resources/strategy/ratio_unfinished
The ratio of tasks that can be unfinished.
- customized_script_header_template_file:
- type:
str
, optionalargument path:resources/strategy/customized_script_header_template_file
The customized template file to generate job submitting script header, which overrides the default file.
- para_deg:
- type:
int
, optional, default:1
argument path:resources/para_deg
Decide how many tasks will be run in parallel.
- source_list:
- type:
typing.List[str]
, optional, default:[]
argument path:resources/source_list
The env file to be sourced before the command execution.
- module_purge:
- type:
bool
, optional, default:False
argument path:resources/module_purge
Remove all modules on HPC system before module load (module_list)
- module_unload_list:
- type:
typing.List[str]
, optional, default:[]
argument path:resources/module_unload_list
The modules to be unloaded on HPC system before submitting jobs
- module_list:
- type:
typing.List[str]
, optional, default:[]
argument path:resources/module_list
The modules to be loaded on HPC system before submitting jobs
- envs:
- type:
dict
, optional, default:{}
argument path:resources/envs
The environment variables to be exported on before submitting jobs
- prepend_script:
- type:
typing.List[str]
, optional, default:[]
argument path:resources/prepend_script
Optional script run before jobs submitted.
- append_script:
- type:
typing.List[str]
, optional, default:[]
argument path:resources/append_script
Optional script run after jobs submitted.
- wait_time:
- type:
float
|int
, optional, default:0
argument path:resources/wait_time
The waitting time in second after a single task submitted
Depending on the value of batch_type, different sub args are accepted.
- batch_type:
When batch_type is set to
Slurm
(or its aliasslurm
):- kwargs:
- type:
dict
, optionalargument path:resources[Slurm]/kwargs
Extra arguments.
- custom_gpu_line:
- type:
str
|NoneType
, optional, default:None
argument path:resources[Slurm]/kwargs/custom_gpu_line
Custom GPU configuration, starting with #SBATCH
When batch_type is set to
Bohrium
(or its aliasesbohrium
,Lebesgue
,lebesgue
,DpCloudServer
,dpcloudserver
):- kwargs:
- type:
dict
, optionalargument path:resources[Bohrium]/kwargs
This field is empty for this batch.
When batch_type is set to
DistributedShell
(or its aliasdistributedshell
):- kwargs:
- type:
dict
, optionalargument path:resources[DistributedShell]/kwargs
This field is empty for this batch.
When batch_type is set to
OpenAPI
(or its aliasopenapi
):- kwargs:
- type:
dict
, optionalargument path:resources[OpenAPI]/kwargs
This field is empty for this batch.
When batch_type is set to
LSF
(or its aliaslsf
):- kwargs:
- type:
dict
argument path:resources[LSF]/kwargs
Extra arguments.
- gpu_usage:
- type:
bool
, optional, default:False
argument path:resources[LSF]/kwargs/gpu_usage
Choosing if GPU is used in the calculation step.
- gpu_new_syntax:
- type:
bool
, optional, default:False
argument path:resources[LSF]/kwargs/gpu_new_syntax
For LFS >= 10.1.0.3, new option -gpu for #BSUB could be used. If False, and old syntax would be used.
- gpu_exclusive:
- type:
bool
, optional, default:True
argument path:resources[LSF]/kwargs/gpu_exclusive
Only take effect when new syntax enabled. Control whether submit tasks in exclusive way for GPU.
- custom_gpu_line:
- type:
str
|NoneType
, optional, default:None
argument path:resources[LSF]/kwargs/custom_gpu_line
Custom GPU configuration, starting with #BSUB
When batch_type is set to
SGE
(or its aliassge
):- kwargs:
- type:
dict
, optionalargument path:resources[SGE]/kwargs
This field is empty for this batch.
When batch_type is set to
Torque
(or its aliastorque
):- kwargs:
- type:
dict
, optionalargument path:resources[Torque]/kwargs
This field is empty for this batch.
When batch_type is set to
PBS
(or its aliaspbs
):- kwargs:
- type:
dict
, optionalargument path:resources[PBS]/kwargs
This field is empty for this batch.
When batch_type is set to
Fugaku
(or its aliasfugaku
):- kwargs:
- type:
dict
, optionalargument path:resources[Fugaku]/kwargs
This field is empty for this batch.
When batch_type is set to
SlurmJobArray
(or its aliasslurmjobarray
):- kwargs:
- type:
dict
, optionalargument path:resources[SlurmJobArray]/kwargs
Extra arguments.
- custom_gpu_line:
- type:
str
|NoneType
, optional, default:None
argument path:resources[SlurmJobArray]/kwargs/custom_gpu_line
Custom GPU configuration, starting with #SBATCH
- slurm_job_size:
- type:
int
, optional, default:1
argument path:resources[SlurmJobArray]/kwargs/slurm_job_size
Number of tasks in a Slurm job
When batch_type is set to
Shell
(or its aliasshell
):- kwargs:
- type:
dict
, optionalargument path:resources[Shell]/kwargs
This field is empty for this batch.
Task parameters
Note
One can load, modify, and export the input file by using our effective web-based tool DP-GUI online or hosted using the command line interface dpdisp gui
. All parameters below can be set in DP-GUI. By clicking “SAVE JSON”, one can download the input file.
- task:
- type:
dict
argument path:task
- command:
- type:
str
argument path:task/command
A command to be executed of this task. The expected return code is 0.
- task_work_path:
- type:
str
argument path:task/task_work_path
The dir where the command to be executed.
- forward_files:
- type:
typing.List[str]
, optional, default:[]
argument path:task/forward_files
The files to be uploaded in task_work_path before the task exectued.
- backward_files:
- type:
typing.List[str]
, optional, default:[]
argument path:task/backward_files
The files to be download to local_root in task_work_path after the task finished
- outlog:
- type:
str
|NoneType
, optional, default:log
argument path:task/outlog
The out log file name. redirect from stdout
- errlog:
- type:
str
|NoneType
, optional, default:err
argument path:task/errlog
The err log file name. redirect from stderr
Command line interface
dpdispatcher: Generate HPC scheduler systems jobs input scripts, submit these scripts to HPC systems, and poke until they finish
usage: dpdisp [-h] {submission,gui} ...
Valid subcommands
- command
Possible choices: submission, gui
Sub-commands
submission
Handle terminated submission.
dpdisp submission [-h] [--download-terminated-log] [--download-finished-task]
[--clean] [--reset-fail-count]
SUBMISSION_HASH
Positional Arguments
- SUBMISSION_HASH
Submission hash to download.
Actions
One or more actions to take on submission.
- --download-terminated-log
Download log files of terminated tasks.
Default: False
- --download-finished-task
Download finished tasks.
Default: False
- --clean
Clean submission.
Default: False
- --reset-fail-count
Reset fail count of all jobs to zero.
Default: False
gui
Serve DP-GUI.
dpdisp gui [-h] [-p PORT] [--bind_all]
Named Arguments
- -p, --port
The port to serve DP-GUI on.
Default: 6042
- --bind_all
Serve on all public interfaces. This will expose your DP-GUI instance to the network on both IPv4 and IPv6 (where available).
Default: False
DPDispatcher API
dpdispatcher package
- class dpdispatcher.Job(job_task_list, *, resources, machine=None)[source]
Bases:
object
Job is generated by Submission automatically. A job ususally has many tasks and it may request computing resources from job scheduler systems. Each Job can generate a script file to be submitted to the job scheduler system or executed locally.
- Parameters:
- job_task_listlist of Task
the tasks belonging to the job
- resourcesResources
the machine resources. Passed from Submission when it constructs jobs.
- machinemachine
machine object to execute the job. Passed from Submission when it constructs jobs.
Methods
deserialize
(job_dict[, machine])Convert the job_dict to a Submission class object.
Get the jobs.
Get last error message when the job is terminated.
serialize
([if_static])Convert the Task class instance to a dictionary.
get_hash
handle_unexpected_job_state
job_to_json
register_job_id
submit_job
- classmethod deserialize(job_dict, machine=None)[source]
Convert the job_dict to a Submission class object.
- Parameters:
- job_dictdict
the dictionary which contains the job information
- machineMachine
the machine object to execute the job
- Returns:
- submissionJob
the Job class instance converted from the job_dict
- get_job_state()[source]
Get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.
Notes
this method will not submit or resubmit the jobs if the job is unsubmitted.
- class dpdispatcher.Machine(*args, **kwargs)[source]
Bases:
object
A machine is used to handle the connection with remote machines.
- Parameters:
- contextSubClass derived from BaseContext
The context is used to mainatin the connection with remote machine.
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
Generate the resources arginfo.
Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
- kill(job)[source]
Kill the job.
If not implemented, pass and let the user manually kill it.
- Parameters:
- jobJob
job
- options = {'Bohrium', 'DistributedShell', 'Fugaku', 'LSF', 'OpenAPI', 'PBS', 'SGE', 'Shell', 'Slurm', 'SlurmJobArray', 'Torque'}
- classmethod resources_arginfo() Argument [source]
Generate the resources arginfo.
- Returns:
- Argument
resources arginfo
- classmethod resources_subfields() List[Argument] [source]
Generate the resources subfields.
- Returns:
- list[Argument]
resources subfields
- subclasses_dict = {'Bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'DistributedShell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'DpCloudServer': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'Fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'LSF': <class 'dpdispatcher.machines.lsf.LSF'>, 'Lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'OpenAPI': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'PBS': <class 'dpdispatcher.machines.pbs.PBS'>, 'SGE': <class 'dpdispatcher.machines.pbs.SGE'>, 'Shell': <class 'dpdispatcher.machines.shell.Shell'>, 'Slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'SlurmJobArray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'Torque': <class 'dpdispatcher.machines.pbs.Torque'>, 'bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'distributedshell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'dpcloudserver': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'lsf': <class 'dpdispatcher.machines.lsf.LSF'>, 'openapi': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'pbs': <class 'dpdispatcher.machines.pbs.PBS'>, 'sge': <class 'dpdispatcher.machines.pbs.SGE'>, 'shell': <class 'dpdispatcher.machines.shell.Shell'>, 'slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'slurmjobarray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'torque': <class 'dpdispatcher.machines.pbs.Torque'>}
- class dpdispatcher.Resources(number_node, cpu_per_node, gpu_per_node, queue_name, group_size, *, custom_flags=[], strategy={'if_cuda_multi_devices': False, 'ratio_unfinished': 0.0}, para_deg=1, module_unload_list=[], module_purge=False, module_list=[], source_list=[], envs={}, prepend_script=[], append_script=[], wait_time=0, **kwargs)[source]
Bases:
object
Resources is used to describe the machine resources we need to do calculations.
- Parameters:
- number_nodeint
The number of node need for each job.
- cpu_per_nodeint
cpu numbers of each node.
- gpu_per_nodeint
gpu numbers of each node.
- queue_namestr
The queue name of batch job scheduler system.
- group_sizeint
The number of tasks in a job.
- custom_flagslist of Str
The extra lines pass to job submitting script header
- strategydict
strategies we use to generation job submitting scripts. if_cuda_multi_devices : bool
If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS. If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task. Usually, this option will be used with Task.task_need_resources variable simultaneously.
- ratio_unfinishedfloat
The ratio of task that can be unfinished.
- customized_script_header_template_filestr
The customized template file to generate job submitting script header, which overrides the default file.
- para_degint
Decide how many tasks will be run in parallel. Usually run with strategy[‘if_cuda_multi_devices’]
- source_listlist of Path
The env file to be sourced before the command execution.
- wait_timeint
The waitting time in second after a single task submitted. Default: 0.
Methods
arginfo
deserialize
load_from_dict
load_from_json
load_from_yaml
serialize
- class dpdispatcher.Submission(work_base, machine=None, resources=None, forward_common_files=[], backward_common_files=[], *, task_list=[])[source]
Bases:
object
A submission represents a collection of tasks. These tasks usually locate at a common directory. And these Tasks may share common files to be uploaded and downloaded.
- Parameters:
- work_basePath
the base directory of the local tasks. It is usually the dir name of project .
- machineMachine
machine class object (for example, PBS, Slurm, Shell) to execute the jobs. The machine can still be bound after the instantiation with the bind_submission method.
- resourcesResources
the machine resources (cpu or gpu) used to generate the slurm/pbs script
- forward_common_fileslist
the common files to be uploaded to other computers before the jobs begin
- backward_common_fileslist
the common files to be downloaded from other computers after the jobs finish
- task_listlist of Task
a list of tasks to be run.
Methods
async_run_submission
(**kwargs)Async interface of run_submission.
bind_machine
(machine)Bind this submission to a machine.
Check whether all the jobs in the submission.
check_ratio_unfinished
(ratio_unfinished)Calculate the ratio of unfinished tasks in the submission.
deserialize
(submission_dict[, machine])Convert the submission_dict to a Submission class object.
After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs.
Handle unexpected job state of the submission.
run_submission
(*[, dry_run, exit_on_submit, ...])Main method to execute the submission.
serialize
([if_static])Convert the Submission class instance to a dictionary.
Check whether all the jobs in the submission.
clean_jobs
download_jobs
get_hash
register_task
register_task_list
remove_unfinished_tasks
submission_from_json
submission_to_json
try_download_result
try_recover_from_json
upload_jobs
- async async_run_submission(**kwargs)[source]
Async interface of run_submission.
Examples
>>> import asyncio >>> from dpdispacher import Machine, Resource, Submission >>> async def run_jobs(): ... backgroud_task = set() ... # task1 ... task1 = Task(...) ... submission1 = Submission(..., task_list=[task1]) ... background_task = asyncio.create_task( ... submission1.async_run_submission(check_interval=2, clean=False) ... ) ... # task2 ... task2 = Task(...) ... submission2 = Submission(..., task_list=[task1]) ... background_task = asyncio.create_task( ... submission2.async_run_submission(check_interval=2, clean=False) ... ) ... background_tasks.add(background_task) ... result = await asyncio.gather(*background_tasks) ... return result >>> run_jobs()
May raise Error if pass clean=True explicitly when submit to pbs or slurm.
- bind_machine(machine)[source]
Bind this submission to a machine. update the machine’s context remote_root and local_root.
- Parameters:
- machineMachine
the machine to bind with
- check_all_finished()[source]
Check whether all the jobs in the submission.
Notes
This method will not handle unexpected job state in the submission.
- check_ratio_unfinished(ratio_unfinished: float) bool [source]
Calculate the ratio of unfinished tasks in the submission.
- Parameters:
- ratio_unfinishedfloat
the ratio of unfinished tasks in the submission
- Returns:
- bool
whether the ratio of unfinished tasks in the submission is larger than ratio_unfinished
- classmethod deserialize(submission_dict, machine=None)[source]
Convert the submission_dict to a Submission class object.
- Parameters:
- submission_dictdict
path-like, the base directory of the local tasks
- machineMachine
Machine class Object to execute the jobs
- Returns:
- submissionSubmission
the Submission class instance converted from the submission_dict
- generate_jobs()[source]
After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs. The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task. Why we randomly shuffle the tasks is under the consideration of load balance. The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.
- handle_unexpected_submission_state()[source]
Handle unexpected job state of the submission. If the job state is unsubmitted, submit the job. If the job state is terminated (killed unexpectly), resubmit the job. If the job state is unknown, raise an error.
- run_submission(*, dry_run=False, exit_on_submit=False, clean=True, check_interval=30)[source]
Main method to execute the submission. First, check whether old Submission exists on the remote machine, and try to recover from it. Second, upload the local files to the remote machine where the tasks to be executed. Third, run the submission defined previously. Forth, wait until the tasks in the submission finished and download the result file to local directory. If dry_run is True, submission will be uploaded but not be executed and exit. If exit_on_submit is True, submission will exit.
- serialize(if_static=False)[source]
Convert the Submission class instance to a dictionary.
- Parameters:
- if_staticbool
whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.
- Returns:
- submission_dictdict
the dictionary converted from the Submission class instance
- class dpdispatcher.Task(command, task_work_path, forward_files=[], backward_files=[], outlog='log', errlog='err')[source]
Bases:
object
A task is a sequential command to be executed, as well as the files it depends on to transmit forward and backward.
- Parameters:
- commandStr
the command to be executed.
- task_work_pathPath
the directory of each file where the files are dependent on.
- forward_fileslist of Path
the files to be transmitted to remote machine before the command execute.
- backward_fileslist of Path
the files to be transmitted from remote machine after the comand finished.
- outlogStr
the filename to which command redirect stdout
- errlogStr
the filename to which command redirect stderr
Methods
deserialize
(task_dict)Convert the task_dict to a Task class object.
get_task_state
(context)Get the task state by checking the tag file.
arginfo
get_hash
load_from_dict
load_from_json
load_from_yaml
serialize
- classmethod deserialize(task_dict)[source]
Convert the task_dict to a Task class object.
- Parameters:
- task_dictdict
the dictionary which contains the task information
- Returns:
- taskTask
the Task class instance converted from the task_dict
Subpackages
dpdispatcher.contexts package
Contexts.
Submodules
dpdispatcher.contexts.dp_cloud_server_context module
- class dpdispatcher.contexts.dp_cloud_server_context.BohriumContext(*args, **kwargs)[source]
Bases:
BaseContext
Methods
machine_arginfo
()Generate the machine arginfo.
Generate the machine subfields.
bind_submission
check_file_exists
check_finish
check_home_file_exits
clean
download
load_from_dict
read_file
read_home_file
upload
upload_job
write_file
write_home_file
write_local_file
- dpdispatcher.contexts.dp_cloud_server_context.DpCloudServerContext
alias of
BohriumContext
- dpdispatcher.contexts.dp_cloud_server_context.LebesgueContext
alias of
BohriumContext
dpdispatcher.contexts.hdfs_context module
- class dpdispatcher.contexts.hdfs_context.HDFSContext(*args, **kwargs)[source]
Bases:
BaseContext
Methods
check_file_exists
(fname)Check whether the given file exists, often used in checking whether the belonging job has finished.
download
(submission[, check_exists, ...])Download backward files from HDFS root dir.
machine_arginfo
()Generate the machine arginfo.
machine_subfields
()Generate the machine subfields.
upload
(submission[, dereference])Upload forward files and forward command files to HDFS root dir.
bind_submission
check_finish
clean
get_job_root
load_from_dict
read_file
write_file
- check_file_exists(fname)[source]
Check whether the given file exists, often used in checking whether the belonging job has finished.
- Parameters:
- fnamestring
file name to be checked
- Returns:
- status: boolean
- download(submission, check_exists=False, mark_failure=True, back_error=False)[source]
Download backward files from HDFS root dir.
- Parameters:
- submissionSubmission class instance
represents a collection of tasks, such as backward file names
- check_existsbool
whether to check if the file exists
- mark_failurebool
whether to mark the task as failed if the file does not exist
- back_errorbool
whether to download error files
- Returns:
- none
dpdispatcher.contexts.lazy_local_context module
- class dpdispatcher.contexts.lazy_local_context.LazyLocalContext(*args, **kwargs)[source]
Bases:
BaseContext
Run jobs in the local server and local directory.
- Parameters:
- local_rootstr
The local directory to store the jobs.
- remote_rootstr, optional
The argument takes no effect.
- remote_profiledict, optional
The remote profile. The default is {}.
- *args
The arguments.
- **kwargs
The keyword arguments.
Methods
machine_arginfo
()Generate the machine arginfo.
machine_subfields
()Generate the machine subfields.
bind_submission
block_call
block_checkcall
call
check_file_exists
check_finish
clean
download
get_job_root
get_return
load_from_dict
read_file
upload
write_file
dpdispatcher.contexts.local_context module
- class dpdispatcher.contexts.local_context.LocalContext(*args, **kwargs)[source]
Bases:
BaseContext
Run jobs in the local server and remote directory.
- Parameters:
- local_rootstr
The local directory to store the jobs.
- remote_rootstr
The remote directory to store the jobs.
- remote_profiledict, optional
The remote profile. The default is {}.
- *args
The arguments.
- **kwargs
The keyword arguments.
Methods
machine_arginfo
()Generate the machine arginfo.
machine_subfields
()Generate the machine subfields.
bind_submission
block_call
block_checkcall
call
check_file_exists
check_finish
clean
download
get_job_root
get_return
load_from_dict
read_file
upload
write_file
dpdispatcher.contexts.openapi_context module
- class dpdispatcher.contexts.openapi_context.OpenAPIContext(*args, **kwargs)[source]
Bases:
BaseContext
Methods
machine_arginfo
()Generate the machine arginfo.
machine_subfields
()Generate the machine subfields.
bind_submission
check_file_exists
check_finish
check_home_file_exits
clean
download
load_from_dict
read_file
read_home_file
upload
upload_job
write_file
write_home_file
write_local_file
dpdispatcher.contexts.ssh_context module
- class dpdispatcher.contexts.ssh_context.SSHContext(*args, **kwargs)[source]
Bases:
BaseContext
- Attributes:
- sftp
- ssh
Methods
block_checkcall
(cmd[, asynchronously, ...])Run command with arguments.
machine_arginfo
()Generate the machine arginfo.
Generate the machine subfields.
bind_submission
block_call
call
check_file_exists
check_finish
clean
close
download
get_job_root
get_return
list_remote_dir
load_from_dict
read_file
upload
write_file
- block_checkcall(cmd, asynchronously=False, stderr_whitelist=None)[source]
Run command with arguments. Wait for command to complete. If the return code was zero then return, otherwise raise RuntimeError.
- Parameters:
- cmdstr
The command to run.
- asynchronouslybool, optional, default=False
Run command asynchronously. If True, nohup will be used to run the command.
- stderr_whitelistlist of str, optional, default=None
If not None, the stderr will be checked against the whitelist. If the stderr contains any of the strings in the whitelist, the command will be considered successful.
- classmethod machine_subfields() List[Argument] [source]
Generate the machine subfields.
- Returns:
- list[Argument]
machine subfields
- property sftp
- property ssh
- class dpdispatcher.contexts.ssh_context.SSHSession(hostname, username, password=None, port=22, key_filename=None, passphrase=None, timeout=10, totp_secret=None, tar_compress=True, look_for_keys=True)[source]
Bases:
object
- Attributes:
- remote
- rsync_available
sftp
Returns sftp.
Methods
inter_handler
(title, instructions, prompt_list)inter_handler: the callback for paramiko.transport.auth_interactive.
arginfo
close
ensure_alive
exec_command
get
get_ssh_client
put
- exec_command(**kwargs)
- inter_handler(title, instructions, prompt_list)[source]
inter_handler: the callback for paramiko.transport.auth_interactive.
The prototype for this function is defined by Paramiko, so all of the arguments need to be there, even though we don’t use ‘title’ or ‘instructions’.
The function is expected to return a tuple of data containing the responses to the provided prompts. Experimental results suggests that there will be one call of this function per prompt, but the mechanism allows for multiple prompts to be sent at once, so it’s best to assume that that can happen.
Since tuples can’t really be built on the fly, the responses are collected in a list which is then converted to a tuple when it’s time to return a value.
Experiments suggest that the username prompt never happens. This makes sense, but the Username prompt is included here just in case.
- property sftp
Returns sftp. Open a new one if not existing.
dpdispatcher.dpcloudserver package
Submodules
dpdispatcher.dpcloudserver.client module
Provide backward compatbility with dflow.
dpdispatcher.entrypoints package
Entry points.
Submodules
dpdispatcher.entrypoints.gui module
DP-GUI entrypoint.
- dpdispatcher.entrypoints.gui.start_dpgui(*, port: int, bind_all: bool, **kwargs)[source]
Host DP-GUI server.
- Parameters:
- portint
The port to serve DP-GUI on.
- bind_allbool
Serve on all public interfaces. This will expose your DP-GUI instance to the network on both IPv4 and IPv6 (where available).
- **kwargs
additional arguments
- Raises:
- ModuleNotFoundError
The dpgui package is not installed
dpdispatcher.entrypoints.submission module
- dpdispatcher.entrypoints.submission.handle_submission(*, submission_hash: str, download_terminated_log: bool = False, download_finished_task: bool = False, clean: bool = False, reset_fail_count: bool = False)[source]
Handle terminated submission.
- Parameters:
- submission_hashstr
Submission hash to download.
- download_terminated_logbool, optional
Download log files of terminated tasks.
- download_finished_taskbool, optional
Download finished tasks.
- cleanbool, optional
Clean submission.
- reset_fail_countbool, optional
Reset fail count of all jobs to zero.
- Raises:
- ValueError
At least one action should be specified.
dpdispatcher.machines package
Machines.
Submodules
dpdispatcher.machines.distributed_shell module
- class dpdispatcher.machines.distributed_shell.DistributedShell(*args, **kwargs)[source]
Bases:
Machine
Methods
do_submit
(job)Submit th job to yarn using distributed shell.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
resources_subfields
()Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
dpdispatcher.machines.dp_cloud_server module
- class dpdispatcher.machines.dp_cloud_server.Bohrium(*args, **kwargs)[source]
Bases:
Machine
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
resources_subfields
()Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_local_script
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
map_dp_job_state
serialize
sub_script_cmd
sub_script_head
dpdispatcher.machines.fugaku module
- class dpdispatcher.machines.fugaku.Fugaku(*args, **kwargs)[source]
Bases:
Machine
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
resources_subfields
()Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
dpdispatcher.machines.lsf module
- class dpdispatcher.machines.lsf.LSF(*args, **kwargs)[source]
Bases:
Machine
LSF batch.
Methods
default_resources
(resources)get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
deserialize
do_submit
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
- check_status(**kwargs)
- do_submit(**kwargs)
Submit a single job, assuming that no job is running there.
dpdispatcher.machines.openapi module
- class dpdispatcher.machines.openapi.OpenAPI(*args, **kwargs)[source]
Bases:
Machine
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
resources_subfields
()Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_local_script
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
map_dp_job_state
serialize
sub_script_cmd
sub_script_head
dpdispatcher.machines.pbs module
- class dpdispatcher.machines.pbs.PBS(*args, **kwargs)[source]
Bases:
Machine
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
resources_subfields
()Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
- class dpdispatcher.machines.pbs.SGE(*args, **kwargs)[source]
Bases:
PBS
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
resources_subfields
()Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
- class dpdispatcher.machines.pbs.Torque(*args, **kwargs)[source]
Bases:
PBS
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
resources_subfields
()Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
dpdispatcher.machines.shell module
- class dpdispatcher.machines.shell.Shell(*args, **kwargs)[source]
Bases:
Machine
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
resources_subfields
()Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
dpdispatcher.machines.slurm module
- class dpdispatcher.machines.slurm.Slurm(*args, **kwargs)[source]
Bases:
Machine
Methods
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
do_submit
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
- check_status(**kwargs)
- do_submit(**kwargs)
Submit a single job, assuming that no job is running there.
- class dpdispatcher.machines.slurm.SlurmJobArray(*args, **kwargs)[source]
Bases:
Slurm
Slurm with job array enabled for multiple tasks in a job.
Methods
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
resources_arginfo
()Generate the resources arginfo.
Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
do_submit
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
- check_status(**kwargs)
dpdispatcher.utils package
Utils.
Subpackages
dpdispatcher.utils.dpcloudserver package
- class dpdispatcher.utils.dpcloudserver.Client(email=None, password=None, debug=False, ticket=None, base_url='https://bohrium.dp.tech/')[source]
Bases:
object
Methods
download
download_from_url
get
get_job_detail
get_job_result_url
get_log
get_tasks_list
job_create
kill
post
refresh_token
upload
Submodules
dpdispatcher.utils.dpcloudserver.client module
- class dpdispatcher.utils.dpcloudserver.client.Client(email=None, password=None, debug=False, ticket=None, base_url='https://bohrium.dp.tech/')[source]
Bases:
object
Methods
download
download_from_url
get
get_job_detail
get_job_result_url
get_log
get_tasks_list
job_create
kill
post
refresh_token
upload
dpdispatcher.utils.dpcloudserver.config module
dpdispatcher.utils.dpcloudserver.retcode module
- class dpdispatcher.utils.dpcloudserver.retcode.RETCODE[source]
Bases:
object
- DATAERR = '2002'
- DBERR = '2000'
- IOERR = '2003'
- NODATA = '2300'
- OK = '0000'
- PARAMERR = '2101'
- PWDERR = '2104'
- REQERR = '2200'
- ROLEERR = '2103'
- THIRDERR = '2001'
- TOKENINVALID = '2100'
- UNDERDEBUG = '2301'
- UNKOWNERR = '2400'
- USERERR = '2102'
- VERIFYERR = '2105'
dpdispatcher.utils.dpcloudserver.zip_file module
Submodules
dpdispatcher.utils.hdfs_cli module
- class dpdispatcher.utils.hdfs_cli.HDFS[source]
Bases:
object
Fundamental class for HDFS basic manipulation.
Methods
copy_from_local
(local_path, to_uri)Returns: True on success Raises: on unexpected error.
exists
(uri)Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.
mkdir
(uri)Make new hdfs directory Returns: True on success Raises: RuntimeError.
remove
(uri)Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.
copy_to_local
move
read_hdfs_file
- static copy_from_local(local_path, to_uri)[source]
Returns: True on success Raises: on unexpected error.
dpdispatcher.utils.job_status module
dpdispatcher.utils.record module
dpdispatcher.utils.utils module
- exception dpdispatcher.utils.utils.RetrySignal[source]
Bases:
Exception
Exception to give a signal to retry the function.
- dpdispatcher.utils.utils.customized_script_header_template(filename: PathLike, resources: Resources) str [source]
- dpdispatcher.utils.utils.generate_totp(secret: str, period: int = 30, token_length: int = 6) str [source]
Generate time-based one time password (TOTP) from the secret.
Some HPCs use TOTP for two-factor authentication for safety.
- Parameters:
- secretstr
The encoded secret provided by the HPC. It’s usually extracted from a 2D code and base32 encoded.
- periodint, default=30
Time period where the code is valid in seconds.
- token_lengthint, default=6
The token length.
- Returns:
- token: str
The generated token.
References
- dpdispatcher.utils.utils.get_sha256(filename)[source]
Get sha256 of a file.
- Parameters:
- filenamestr
The filename.
- Returns:
- sha256: str
The sha256.
- dpdispatcher.utils.utils.retry(max_retry: int = 3, sleep: int | float = 60, catch_exception: ~typing.Type[BaseException] = <class 'dpdispatcher.utils.utils.RetrySignal'>) Callable [source]
Retry the function until it succeeds or fails for certain times.
- Parameters:
- max_retryint, default=3
The maximum retry times. If None, it will retry forever.
- sleepint or float, default=60
The sleep time in seconds.
- catch_exceptionException, default=Exception
The exception to catch.
- Returns:
- decorator: Callable
The decorator.
Examples
>>> @retry(max_retry=3, sleep=60, catch_exception=RetrySignal) ... def func(): ... raise RetrySignal("Failed")
- dpdispatcher.utils.utils.rsync(from_file: str, to_file: str, port: int = 22, key_filename: str | None = None, timeout: int | float = 10)[source]
Call rsync to transfer files.
- Parameters:
- from_filestr
SRC
- to_filestr
DEST
- portint, default=22
port for ssh
- key_filenamestr, optional
identity file name
- timeoutint, default=10
timeout for ssh
- Raises:
- RuntimeError
when return code is not 0
Submodules
dpdispatcher.arginfo module
dpdispatcher.base_context module
- class dpdispatcher.base_context.BaseContext(*args, **kwargs)[source]
Bases:
object
Methods
Generate the machine arginfo.
Generate the machine subfields.
bind_submission
check_finish
clean
download
load_from_dict
read_file
upload
write_file
- classmethod machine_arginfo() Argument [source]
Generate the machine arginfo.
- Returns:
- Argument
machine arginfo
- classmethod machine_subfields() List[Argument] [source]
Generate the machine subfields.
- Returns:
- list[Argument]
machine subfields
- options = {'BohriumContext', 'HDFSContext', 'LazyLocalContext', 'LocalContext', 'OpenAPIContext', 'SSHContext'}
- subclasses_dict = {'Bohrium': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'BohriumContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'DpCloudServer': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'DpCloudServerContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'HDFS': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'HDFSContext': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'LazyLocal': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'LazyLocalContext': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'Lebesgue': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'LebesgueContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'Local': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'LocalContext': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'OpenAPI': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'OpenAPIContext': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'SSH': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'SSHContext': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'bohrium': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'bohriumcontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'dpcloudserver': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'dpcloudservercontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'hdfs': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'hdfscontext': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'lazylocal': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'lazylocalcontext': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'lebesgue': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'lebesguecontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'local': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'localcontext': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'openapi': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'openapicontext': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'ssh': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'sshcontext': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>}
dpdispatcher.dlog module
dpdispatcher.dpdisp module
- dpdispatcher.dpdisp.main_parser() ArgumentParser [source]
Dpdispatcher commandline options argument parser.
- Returns:
- argparse.ArgumentParser
the argument parser
Notes
This function is used by documentation.
dpdispatcher.machine module
- class dpdispatcher.machine.Machine(*args, **kwargs)[source]
Bases:
object
A machine is used to handle the connection with remote machines.
- Parameters:
- contextSubClass derived from BaseContext
The context is used to mainatin the connection with remote machine.
Methods
do_submit
(job)Submit a single job, assuming that no job is running there.
get_exit_code
(job)Get exit code of the job.
kill
(job)Kill the job.
Generate the resources arginfo.
Generate the resources subfields.
arginfo
bind_context
check_finish_tag
check_if_recover
check_status
default_resources
deserialize
gen_command_env_cuda_devices
gen_script
gen_script_command
gen_script_custom_flags_lines
gen_script_end
gen_script_env
gen_script_header
gen_script_run_command
gen_script_wait
load_from_dict
load_from_json
load_from_yaml
serialize
sub_script_cmd
sub_script_head
- kill(job)[source]
Kill the job.
If not implemented, pass and let the user manually kill it.
- Parameters:
- jobJob
job
- options = {'Bohrium', 'DistributedShell', 'Fugaku', 'LSF', 'OpenAPI', 'PBS', 'SGE', 'Shell', 'Slurm', 'SlurmJobArray', 'Torque'}
- classmethod resources_arginfo() Argument [source]
Generate the resources arginfo.
- Returns:
- Argument
resources arginfo
- classmethod resources_subfields() List[Argument] [source]
Generate the resources subfields.
- Returns:
- list[Argument]
resources subfields
- subclasses_dict = {'Bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'DistributedShell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'DpCloudServer': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'Fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'LSF': <class 'dpdispatcher.machines.lsf.LSF'>, 'Lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'OpenAPI': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'PBS': <class 'dpdispatcher.machines.pbs.PBS'>, 'SGE': <class 'dpdispatcher.machines.pbs.SGE'>, 'Shell': <class 'dpdispatcher.machines.shell.Shell'>, 'Slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'SlurmJobArray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'Torque': <class 'dpdispatcher.machines.pbs.Torque'>, 'bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'distributedshell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'dpcloudserver': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'lsf': <class 'dpdispatcher.machines.lsf.LSF'>, 'openapi': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'pbs': <class 'dpdispatcher.machines.pbs.PBS'>, 'sge': <class 'dpdispatcher.machines.pbs.SGE'>, 'shell': <class 'dpdispatcher.machines.shell.Shell'>, 'slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'slurmjobarray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'torque': <class 'dpdispatcher.machines.pbs.Torque'>}
dpdispatcher.submission module
- class dpdispatcher.submission.Job(job_task_list, *, resources, machine=None)[source]
Bases:
object
Job is generated by Submission automatically. A job ususally has many tasks and it may request computing resources from job scheduler systems. Each Job can generate a script file to be submitted to the job scheduler system or executed locally.
- Parameters:
- job_task_listlist of Task
the tasks belonging to the job
- resourcesResources
the machine resources. Passed from Submission when it constructs jobs.
- machinemachine
machine object to execute the job. Passed from Submission when it constructs jobs.
Methods
deserialize
(job_dict[, machine])Convert the job_dict to a Submission class object.
Get the jobs.
Get last error message when the job is terminated.
serialize
([if_static])Convert the Task class instance to a dictionary.
get_hash
handle_unexpected_job_state
job_to_json
register_job_id
submit_job
- classmethod deserialize(job_dict, machine=None)[source]
Convert the job_dict to a Submission class object.
- Parameters:
- job_dictdict
the dictionary which contains the job information
- machineMachine
the machine object to execute the job
- Returns:
- submissionJob
the Job class instance converted from the job_dict
- get_job_state()[source]
Get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.
Notes
this method will not submit or resubmit the jobs if the job is unsubmitted.
- class dpdispatcher.submission.Resources(number_node, cpu_per_node, gpu_per_node, queue_name, group_size, *, custom_flags=[], strategy={'if_cuda_multi_devices': False, 'ratio_unfinished': 0.0}, para_deg=1, module_unload_list=[], module_purge=False, module_list=[], source_list=[], envs={}, prepend_script=[], append_script=[], wait_time=0, **kwargs)[source]
Bases:
object
Resources is used to describe the machine resources we need to do calculations.
- Parameters:
- number_nodeint
The number of node need for each job.
- cpu_per_nodeint
cpu numbers of each node.
- gpu_per_nodeint
gpu numbers of each node.
- queue_namestr
The queue name of batch job scheduler system.
- group_sizeint
The number of tasks in a job.
- custom_flagslist of Str
The extra lines pass to job submitting script header
- strategydict
strategies we use to generation job submitting scripts. if_cuda_multi_devices : bool
If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS. If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task. Usually, this option will be used with Task.task_need_resources variable simultaneously.
- ratio_unfinishedfloat
The ratio of task that can be unfinished.
- customized_script_header_template_filestr
The customized template file to generate job submitting script header, which overrides the default file.
- para_degint
Decide how many tasks will be run in parallel. Usually run with strategy[‘if_cuda_multi_devices’]
- source_listlist of Path
The env file to be sourced before the command execution.
- wait_timeint
The waitting time in second after a single task submitted. Default: 0.
Methods
arginfo
deserialize
load_from_dict
load_from_json
load_from_yaml
serialize
- class dpdispatcher.submission.Submission(work_base, machine=None, resources=None, forward_common_files=[], backward_common_files=[], *, task_list=[])[source]
Bases:
object
A submission represents a collection of tasks. These tasks usually locate at a common directory. And these Tasks may share common files to be uploaded and downloaded.
- Parameters:
- work_basePath
the base directory of the local tasks. It is usually the dir name of project .
- machineMachine
machine class object (for example, PBS, Slurm, Shell) to execute the jobs. The machine can still be bound after the instantiation with the bind_submission method.
- resourcesResources
the machine resources (cpu or gpu) used to generate the slurm/pbs script
- forward_common_fileslist
the common files to be uploaded to other computers before the jobs begin
- backward_common_fileslist
the common files to be downloaded from other computers after the jobs finish
- task_listlist of Task
a list of tasks to be run.
Methods
async_run_submission
(**kwargs)Async interface of run_submission.
bind_machine
(machine)Bind this submission to a machine.
Check whether all the jobs in the submission.
check_ratio_unfinished
(ratio_unfinished)Calculate the ratio of unfinished tasks in the submission.
deserialize
(submission_dict[, machine])Convert the submission_dict to a Submission class object.
After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs.
Handle unexpected job state of the submission.
run_submission
(*[, dry_run, exit_on_submit, ...])Main method to execute the submission.
serialize
([if_static])Convert the Submission class instance to a dictionary.
Check whether all the jobs in the submission.
clean_jobs
download_jobs
get_hash
register_task
register_task_list
remove_unfinished_tasks
submission_from_json
submission_to_json
try_download_result
try_recover_from_json
upload_jobs
- async async_run_submission(**kwargs)[source]
Async interface of run_submission.
Examples
>>> import asyncio >>> from dpdispacher import Machine, Resource, Submission >>> async def run_jobs(): ... backgroud_task = set() ... # task1 ... task1 = Task(...) ... submission1 = Submission(..., task_list=[task1]) ... background_task = asyncio.create_task( ... submission1.async_run_submission(check_interval=2, clean=False) ... ) ... # task2 ... task2 = Task(...) ... submission2 = Submission(..., task_list=[task1]) ... background_task = asyncio.create_task( ... submission2.async_run_submission(check_interval=2, clean=False) ... ) ... background_tasks.add(background_task) ... result = await asyncio.gather(*background_tasks) ... return result >>> run_jobs()
May raise Error if pass clean=True explicitly when submit to pbs or slurm.
- bind_machine(machine)[source]
Bind this submission to a machine. update the machine’s context remote_root and local_root.
- Parameters:
- machineMachine
the machine to bind with
- check_all_finished()[source]
Check whether all the jobs in the submission.
Notes
This method will not handle unexpected job state in the submission.
- check_ratio_unfinished(ratio_unfinished: float) bool [source]
Calculate the ratio of unfinished tasks in the submission.
- Parameters:
- ratio_unfinishedfloat
the ratio of unfinished tasks in the submission
- Returns:
- bool
whether the ratio of unfinished tasks in the submission is larger than ratio_unfinished
- classmethod deserialize(submission_dict, machine=None)[source]
Convert the submission_dict to a Submission class object.
- Parameters:
- submission_dictdict
path-like, the base directory of the local tasks
- machineMachine
Machine class Object to execute the jobs
- Returns:
- submissionSubmission
the Submission class instance converted from the submission_dict
- generate_jobs()[source]
After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs. The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task. Why we randomly shuffle the tasks is under the consideration of load balance. The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.
- handle_unexpected_submission_state()[source]
Handle unexpected job state of the submission. If the job state is unsubmitted, submit the job. If the job state is terminated (killed unexpectly), resubmit the job. If the job state is unknown, raise an error.
- run_submission(*, dry_run=False, exit_on_submit=False, clean=True, check_interval=30)[source]
Main method to execute the submission. First, check whether old Submission exists on the remote machine, and try to recover from it. Second, upload the local files to the remote machine where the tasks to be executed. Third, run the submission defined previously. Forth, wait until the tasks in the submission finished and download the result file to local directory. If dry_run is True, submission will be uploaded but not be executed and exit. If exit_on_submit is True, submission will exit.
- serialize(if_static=False)[source]
Convert the Submission class instance to a dictionary.
- Parameters:
- if_staticbool
whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.
- Returns:
- submission_dictdict
the dictionary converted from the Submission class instance
- class dpdispatcher.submission.Task(command, task_work_path, forward_files=[], backward_files=[], outlog='log', errlog='err')[source]
Bases:
object
A task is a sequential command to be executed, as well as the files it depends on to transmit forward and backward.
- Parameters:
- commandStr
the command to be executed.
- task_work_pathPath
the directory of each file where the files are dependent on.
- forward_fileslist of Path
the files to be transmitted to remote machine before the command execute.
- backward_fileslist of Path
the files to be transmitted from remote machine after the comand finished.
- outlogStr
the filename to which command redirect stdout
- errlogStr
the filename to which command redirect stderr
Methods
deserialize
(task_dict)Convert the task_dict to a Task class object.
get_task_state
(context)Get the task state by checking the tag file.
arginfo
get_hash
load_from_dict
load_from_json
load_from_yaml
serialize
- classmethod deserialize(task_dict)[source]
Convert the task_dict to a Task class object.
- Parameters:
- task_dictdict
the dictionary which contains the task information
- Returns:
- taskTask
the Task class instance converted from the task_dict
Running the DeePMD-kit on the Expanse cluster
Expanse is a cluster operated by the San Diego Supercomputer Center. Here we provide an example to run jobs on the expanse.
The machine parameters are provided below. Expanse uses the SLURM workload manager for job scheduling. remote_root has been created in advance. It’s worth metioned that we do not recommend to use the password, so SSH keys are used instead to improve security.
1{
2 "batch_type": "Slurm",
3 "local_root": "./",
4 "remote_root": "/expanse/lustre/scratch/njzjz/temp_project/dpgen_workdir",
5 "clean_asynchronously": true,
6 "context_type": "SSHContext",
7 "remote_profile": {
8 "hostname": "login.expanse.sdsc.edu",
9 "username": "njzjz",
10 "port": 22
11 }
12}
Expanse’s standard compute nodes are each powered by two 64-core AMD EPYC 7742 processors and contain 256 GB of DDR4 memory. Here, we request one node with 32 cores and 16 GB memory from the shared
partition. Expanse does not support --gres=gpu:0
command, so we use custom_gpu_line to customize the statement.
1{
2 "number_node": 1,
3 "cpu_per_node": 1,
4 "gpu_per_node": 0,
5 "queue_name": "shared",
6 "group_size": 1,
7 "custom_flags": [
8 "#SBATCH -c 32",
9 "#SBATCH --mem=16G",
10 "#SBATCH --time=48:00:00",
11 "#SBATCH --account=rut149",
12 "#SBATCH --requeue"
13 ],
14 "source_list": [
15 "activate /home/njzjz/deepmd-kit"
16 ],
17 "envs": {
18 "OMP_NUM_THREADS": 4,
19 "TF_INTRA_OP_PARALLELISM_THREADS": 4,
20 "TF_INTER_OP_PARALLELISM_THREADS": 8,
21 "DP_AUTO_PARALLELIZATION": 1
22 },
23 "batch_type": "Slurm",
24 "kwargs": {
25 "custom_gpu_line": "#SBATCH --gpus=0"
26 }
27}
The following task parameter runs a DeePMD-kit task, forwarding an input file and backwarding graph files. Here, the data set will be used among all the tasks, so it is not included in the forward_files. Instead, it should be included in the submission’s forward_common_files.
1{
2 "command": "dp train input.json && dp freeze && dp compress",
3 "task_work_path": "model1/",
4 "forward_files": [
5 "input.json"
6 ],
7 "backward_files": [
8 "frozen_model.pb",
9 "frozen_model_compressed.pb"
10 ],
11 "outlog": "log",
12 "errlog": "err"
13}
Running Gaussian 16 with failure allowed
Typically, a task will retry three times if the exit code is not zero. Sometimes, one may allow non-zero code. For example, when running large amounts of Gaussian 16 single-point calculation tasks, some of the Gaussian 16 tasks may throw SCF errors and return a non-zero code. One can append ||:
to the command:
1{
2 "command": "g16 < input > output ||:",
3 "task_work_path": "p1/",
4 "forward_files": [
5 "input"
6 ],
7 "backward_files": [
8 "output"
9 ]
10}
This command ensures the task will always provide zero code.
Running multiple MD tasks on a GPU workstation
In this example, we are going to show how to run multiple MD tasks on a GPU workstation. This workstation does not install any job scheduling packages installed, so we will use Shell
as batch_type.
1{
2 "batch_type": "Shell",
3 "local_root": "./",
4 "remote_root": "/data2/jinzhe/dpgen_workdir",
5 "clean_asynchronously": true,
6 "context_type": "SSHContext",
7 "remote_profile": {
8 "hostname": "mandu.iqb.rutgers.edu",
9 "username": "jz748",
10 "port": 22
11 }
12}
The workstation has 48 cores of CPUs and 8 RTX3090 cards. Here we hope each card runs 6 tasks at the same time, as each task does not consume too many GPU resources. Thus, strategy/if_cuda_multi_devices is set to true
and para_deg is set to 6.
1{
2 "number_node": 1,
3 "cpu_per_node": 48,
4 "gpu_per_node": 8,
5 "queue_name": "shell",
6 "group_size": 0,
7 "strategy": {
8 "if_cuda_multi_devices": true
9 },
10 "source_list": [
11 "activate /home/jz748/deepmd-kit"
12 ],
13 "envs": {
14 "OMP_NUM_THREADS": 1,
15 "TF_INTRA_OP_PARALLELISM_THREADS": 1,
16 "TF_INTER_OP_PARALLELISM_THREADS": 1
17 },
18 "para_deg": 6
19}
Note that group_size should be set to 0
(means infinity) to ensure there is only one job and avoid running multiple jobs at the same time.
Customizing the submission script header
When submitting jobs to some clusters, such as the Tiger Cluster at Princeton University, the Slurm header is quite different from the standard one. In this case, DPDispatcher allows users to customize the templates by setting strategy/customized_script_header_template_file
to a template file:
1{
2 "number_node": 1,
3 "cpu_per_node": 32,
4 "kwargs":{
5 "qos": "tiger-vshort"
6 },
7 "source_list": ["activate abacus_env"],
8 "strategy": {
9 "customized_script_header_template_file": "./template.slurm"
10 },
11 "group_size": 2000
12}
template.slurm
is the template file, where str.format()
is used to format the template with Resources Parameters:
1#!/bin/bash -l
2#SBATCH --parsable
3#SBATCH --nodes={number_node}
4#SBATCH --ntasks-per-node={cpu_per_node}
5#SBATCH --qos={kwargs[qos]}
6#SBATCH --time=01:02:00
7#SBATCH --mem-per-cpu=4G
See Python Format String Syntax for how to insert parameters inside the template.