DPDispatcher’s documentation

DPDispatcher is a Python package used to generate HPC (High Performance Computing) scheduler systems (Slurm/PBS/LSF/dpcloudserver) jobs input scripts and submit these scripts to HPC systems and poke until they finish.

DPDispatcher will monitor (poke) until these jobs finish and download the results files (if these jobs is running on remote systems connected by SSH).

Install DPDispatcher

DPDispatcher can installed by pip:

pip install dpdispatcher

To add Bohrium support, execute

pip install dpdispatcher[bohrium]

Getting Started

DPDispatcher provides the following classes:

  • Task class, which represents a command to be run on batch job system, as well as the essential files need by the command.

  • Submission class, which represents a collection of jobs defined by the HPC system. And there may be common files to be uploaded by them. DPDispatcher will create and submit these jobs when a submission instance execute run_submission method. This method will poke until the jobs finish and return.

  • Job class, a class used by Submission class, which represents a job on the HPC system. Submission will generate jobs’ submitting scripts used by HPC systems automatically with the Task and Resources

  • Resources class, which represents the computing resources for each job within a submission.

You can use DPDispatcher in a Python script to submit five tasks:

from dpdispatcher import Machine, Resources, Task, Submission

machine = Machine.load_from_json("machine.json")
resources = Resources.load_from_json("resources.json")

task0 = Task.load_from_json("task.json")

task1 = Task(
    command="cat example.txt",
    task_work_path="dir1/",
    forward_files=["example.txt"],
    backward_files=["out.txt"],
    outlog="out.txt",
)
task2 = Task(
    command="cat example.txt",
    task_work_path="dir2/",
    forward_files=["example.txt"],
    backward_files=["out.txt"],
    outlog="out.txt",
)
task3 = Task(
    command="cat example.txt",
    task_work_path="dir3/",
    forward_files=["example.txt"],
    backward_files=["out.txt"],
    outlog="out.txt",
)
task4 = Task(
    command="cat example.txt",
    task_work_path="dir4/",
    forward_files=["example.txt"],
    backward_files=["out.txt"],
    outlog="out.txt",
)

task_list = [task0, task1, task2, task3, task4]

submission = Submission(
    work_base="lammps_md_300K_5GPa/",
    machine=machine,
    resources=resources,
    task_list=task_list,
    forward_common_files=["graph.pb"],
    backward_common_files=[],
)

submission.run_submission()

where machine.json is

{
    "batch_type": "Slurm",
    "context_type": "SSHContext",
    "local_root" : "/home/user123/workplace/22_new_project/",
    "remote_root": "/home/user123/dpdispatcher_work_dir/",
    "remote_profile":{
        "hostname": "39.106.xx.xxx",
        "username": "user123",
        "port": 22,
        "timeout": 10
    }
}

resources.json is

{
    "number_node": 1,
    "cpu_per_node": 4,
    "gpu_per_node": 1,
    "queue_name": "GPUV100",
    "group_size": 5
}

and task.json is

{
    "command": "lmp -i input.lammps",
    "task_work_path": "bct-0/",
    "forward_files": [
        "conf.lmp",
        "input.lammps"
    ],
    "backward_files": [
        "log.lammps"
    ],
    "outlog": "log",
    "errlog": "err",
}

You may also submit mutiple GPU jobs: complex resources example

resources = Resources(
    number_node=1,
    cpu_per_node=4,
    gpu_per_node=2,
    queue_name="GPU_2080Ti",
    group_size=4,
    custom_flags=["#SBATCH --nice=100", "#SBATCH --time=24:00:00"],
    strategy={
        # used when you want to add CUDA_VISIBLE_DIVECES automatically
        "if_cuda_multi_devices": True
    },
    para_deg=1,
    # will unload these modules before running tasks
    module_unload_list=["singularity"],
    # will load these modules before running tasks
    module_list=["singularity/3.0.0"],
    # will source the environment files before running tasks
    source_list=["./slurm_test.env"],
    # the envs option is used to export environment variables
    # And it will generate a line like below.
    # export DP_DISPATCHER_EXPORT=test_foo_bar_baz
    envs={"DP_DISPATCHER_EXPORT": "test_foo_bar_baz"},
)

The details of parameters can be found in Machine Parameters, Resources Parameters, and Task Parameters.

Supported contexts

Context is the way to connect to the remote server. One needs to set context_type to one of the following values:

LazyLocal

context_type: LazyLocal

LazyLocal directly runs jobs in the local server and local directory.

Local

context_type: Local

Local runs jobs in the local server, but in a different directory. Files will be copied to the remote directory before jobs start and copied back after jobs finish.

SSH

context_type: SSH

SSH runs jobs in a remote server. Files will be copied to the remote directory via SSH channels before jobs start and copied back after jobs finish. To use SSH, one needs to provide necessary parameters in remote_profile, such as username and hostname.

It’s suggested to generate SSH keys and transfer the public key to the remote server in advance, which is more secure than password authentication.

Note that SSH context is non-login, so bash_profile files will not be executed.

Bohrium

context_type: Bohrium

Bohrium is the cloud platform for scientific computing. Read Bohrium documentation for details. To use Bohrium, one needs to provide necessary parameters in remote_profile.

HDFS

context_type: HDFS

The Hadoop Distributed File System (HDFS) is a distributed file system. Read Support DPDispatcher on Yarn for details.

OpenAPI

context_type: OpenAPI

OpenAPI is a new way to submit jobs to Bohrium. It using AccessKey instead of username and password. Read Bohrium documentation for details. To use OpenAPI, one needs to provide necessary parameters in remote_profile.

Supported batch job systems

Batch job system is a system to process batch jobs. One needs to set batch_type to one of the following values:

Bash

batch_type: Shell

When batch_type is set to Shell, dpdispatcher will generate a bash script to process jobs. No extra packages are required for Shell.

Due to lack of scheduling system, Shell runs all jobs at the same time. To avoid running multiple jobs at the same time, one could set group_size to 0 (means infinity) to generate only one job with multiple tasks.

Slurm

batch_type: Slurm, SlurmJobArray

Slurm is a job scheduling system used by lots of HPCs. One needs to make sure slurm has been setup in the remote server and the related environment is activated.

When SlurmJobArray is used, dpdispatcher submits Slurm jobs with job arrays. In this way, several dpdispatcher tasks map to a Slurm job and a dpdispatcher job maps to a Slurm job array. Millions of Slurm jobs can be submitted quickly and Slurm can execute all Slurm jobs at the same time. One can use group_size and slurm_job_size to control how many Slurm jobs are contained in a Slurm job array.

OpenPBS or PBSPro

batch_type: PBS

OpenPBS is an open-source job scheduling of the Linux Foundation and PBS Profession is its commercial solution. One needs to make sure OpenPBS has been setup in the remote server and the related environment is activated.

Note that do not use PBS for Torque.

TORQUE

batch_type: Torque

The Terascale Open-source Resource and QUEue Manager (TORQUE) is a distributed resource manager based on standard OpenPBS. However, not all OpenPBS flags are still supported in TORQUE. One needs to make sure TORQUE has been setup in the remote server and the related environment is activated.

LSF

batch_type: LSF

IBM Spectrum LSF Suites is a comprehensive workload management solution used by HPCs. One needs to make sure LSF has been setup in the remote server and the related environment is activated.

Bohrium

batch_type: Bohrium

Bohrium is the cloud platform for scientific computing. Read Bohrium documentation for details.

DistributedShell

batch_type: DistributedShell

DistributedShell is used to submit yarn jobs. Read Support DPDispatcher on Yarn for details.

Fugaku

batch_type: Fugaku

Fujitsu cloud service is a job scheduling system used by Fujitsu’s HPCs such as Fugaku, ITO and K computer. It should be noted that although the same job scheduling system is used, there are some differences in the details, Fagaku class cannot be directly used for other HPCs.

Read Fujitsu cloud service documentation for details.

OpenAPI

batcy_type: OpenAPI OpenAPI is a new way to submit jobs to Bohrium. It using AccessKey instead of username and password. Read Bohrium documentation for details.

Machine parameters

Note

One can load, modify, and export the input file by using our effective web-based tool DP-GUI online or hosted using the command line interface dpdisp gui. All parameters below can be set in DP-GUI. By clicking “SAVE JSON”, one can download the input file.

machine:
type: dict
argument path: machine
batch_type:
type: str
argument path: machine/batch_type

The batch job system type. Option: Bohrium, SlurmJobArray, DistributedShell, LSF, Fugaku, Torque, Shell, PBS, Slurm, OpenAPI

local_root:
type: str | NoneType
argument path: machine/local_root

The dir where the tasks and relating files locate. Typically the project dir.

remote_root:
type: str | NoneType, optional
argument path: machine/remote_root

The dir where the tasks are executed on the remote machine. Only needed when context is not lazy-local.

clean_asynchronously:
type: bool, optional, default: False
argument path: machine/clean_asynchronously

Clean the remote directory asynchronously after the job finishes.

Depending on the value of context_type, different sub args are accepted.

context_type:
type: str (flag key)
argument path: machine/context_type

The connection used to remote machine. Option: LocalContext, SSHContext, HDFSContext, OpenAPIContext, LazyLocalContext, BohriumContext

When context_type is set to SSHContext (or its aliases sshcontext, SSH, ssh):

remote_profile:
type: dict
argument path: machine[SSHContext]/remote_profile

The information used to maintain the connection with remote machine.

hostname:
type: str
argument path: machine[SSHContext]/remote_profile/hostname

hostname or ip of ssh connection.

username:
type: str
argument path: machine[SSHContext]/remote_profile/username

username of target linux system

password:
type: str, optional
argument path: machine[SSHContext]/remote_profile/password

(deprecated) password of linux system. Please use SSH keys instead to improve security.

port:
type: int, optional, default: 22
argument path: machine[SSHContext]/remote_profile/port

ssh connection port.

key_filename:
type: str | NoneType, optional, default: None
argument path: machine[SSHContext]/remote_profile/key_filename

key filename used by ssh connection. If left None, find key in ~/.ssh or use password for login

passphrase:
type: str | NoneType, optional, default: None
argument path: machine[SSHContext]/remote_profile/passphrase

passphrase of key used by ssh connection

timeout:
type: int, optional, default: 10
argument path: machine[SSHContext]/remote_profile/timeout

timeout of ssh connection

totp_secret:
type: str | NoneType, optional, default: None
argument path: machine[SSHContext]/remote_profile/totp_secret

Time-based one time password secret. It should be a base32-encoded string extracted from the 2D code.

tar_compress:
type: bool, optional, default: True
argument path: machine[SSHContext]/remote_profile/tar_compress

The archive will be compressed in upload and download if it is True. If not, compression will be skipped.

look_for_keys:
type: bool, optional, default: True
argument path: machine[SSHContext]/remote_profile/look_for_keys

enable searching for discoverable private key files in ~/.ssh/

When context_type is set to LocalContext (or its aliases localcontext, Local, local):

remote_profile:
type: dict, optional
argument path: machine[LocalContext]/remote_profile

The information used to maintain the connection with remote machine. This field is empty for this context.

When context_type is set to HDFSContext (or its aliases hdfscontext, HDFS, hdfs):

remote_profile:
type: dict, optional
argument path: machine[HDFSContext]/remote_profile

The information used to maintain the connection with remote machine. This field is empty for this context.

When context_type is set to LazyLocalContext (or its aliases lazylocalcontext, LazyLocal, lazylocal):

remote_profile:
type: dict, optional
argument path: machine[LazyLocalContext]/remote_profile

The information used to maintain the connection with remote machine. This field is empty for this context.

When context_type is set to OpenAPIContext (or its aliases openapicontext, OpenAPI, openapi):

remote_profile:
type: dict, optional
argument path: machine[OpenAPIContext]/remote_profile

The information used to maintain the connection with remote machine. This field is empty for this context.

When context_type is set to BohriumContext (or its aliases bohriumcontext, Bohrium, bohrium, DpCloudServerContext, dpcloudservercontext, DpCloudServer, dpcloudserver, LebesgueContext, lebesguecontext, Lebesgue, lebesgue):

remote_profile:
type: dict
argument path: machine[BohriumContext]/remote_profile

The information used to maintain the connection with remote machine.

email:
type: str, optional
argument path: machine[BohriumContext]/remote_profile/email

Email

password:
type: str, optional
argument path: machine[BohriumContext]/remote_profile/password

Password

program_id:
type: int, alias: project_id
argument path: machine[BohriumContext]/remote_profile/program_id

Program ID

retry_count:
type: NoneType | int, optional, default: 2
argument path: machine[BohriumContext]/remote_profile/retry_count

The retry count when a job is terminated

ignore_exit_code:
type: bool, optional, default: True
argument path: machine[BohriumContext]/remote_profile/ignore_exit_code
The job state will be marked as finished if the exit code is non-zero when set to True. Otherwise,

the job state will be designated as terminated.

keep_backup:
type: bool, optional
argument path: machine[BohriumContext]/remote_profile/keep_backup

keep download and upload zip

input_data:
type: dict
argument path: machine[BohriumContext]/remote_profile/input_data

Configuration of job

Resources parameters

Note

One can load, modify, and export the input file by using our effective web-based tool DP-GUI online or hosted using the command line interface dpdisp gui. All parameters below can be set in DP-GUI. By clicking “SAVE JSON”, one can download the input file for.

resources:
type: dict
argument path: resources
number_node:
type: int, optional, default: 1
argument path: resources/number_node

The number of node need for each job

cpu_per_node:
type: int, optional, default: 1
argument path: resources/cpu_per_node

cpu numbers of each node assigned to each job.

gpu_per_node:
type: int, optional, default: 0
argument path: resources/gpu_per_node

gpu numbers of each node assigned to each job.

queue_name:
type: str, optional, default: (empty string)
argument path: resources/queue_name

The queue name of batch job scheduler system.

group_size:
type: int
argument path: resources/group_size

The number of tasks in a job. 0 means infinity.

custom_flags:
type: typing.List[str], optional
argument path: resources/custom_flags

The extra lines pass to job submitting script header

strategy:
type: dict, optional
argument path: resources/strategy

strategies we use to generation job submitting scripts.

if_cuda_multi_devices:
type: bool, optional, default: False
argument path: resources/strategy/if_cuda_multi_devices

If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS.If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task.Usually, this option will be used with Task.task_need_resources variable simultaneously.

ratio_unfinished:
type: float, optional, default: 0.0
argument path: resources/strategy/ratio_unfinished

The ratio of tasks that can be unfinished.

customized_script_header_template_file:
type: str, optional
argument path: resources/strategy/customized_script_header_template_file

The customized template file to generate job submitting script header, which overrides the default file.

para_deg:
type: int, optional, default: 1
argument path: resources/para_deg

Decide how many tasks will be run in parallel.

source_list:
type: typing.List[str], optional, default: []
argument path: resources/source_list

The env file to be sourced before the command execution.

module_purge:
type: bool, optional, default: False
argument path: resources/module_purge

Remove all modules on HPC system before module load (module_list)

module_unload_list:
type: typing.List[str], optional, default: []
argument path: resources/module_unload_list

The modules to be unloaded on HPC system before submitting jobs

module_list:
type: typing.List[str], optional, default: []
argument path: resources/module_list

The modules to be loaded on HPC system before submitting jobs

envs:
type: dict, optional, default: {}
argument path: resources/envs

The environment variables to be exported on before submitting jobs

prepend_script:
type: typing.List[str], optional, default: []
argument path: resources/prepend_script

Optional script run before jobs submitted.

append_script:
type: typing.List[str], optional, default: []
argument path: resources/append_script

Optional script run after jobs submitted.

wait_time:
type: float | int, optional, default: 0
argument path: resources/wait_time

The waitting time in second after a single task submitted

Depending on the value of batch_type, different sub args are accepted.

batch_type:
type: str (flag key)
argument path: resources/batch_type

The batch job system type loaded from machine/batch_type.

When batch_type is set to Bohrium (or its aliases bohrium, Lebesgue, lebesgue, DpCloudServer, dpcloudserver):

kwargs:
type: dict, optional
argument path: resources[Bohrium]/kwargs

This field is empty for this batch.

When batch_type is set to DistributedShell (or its alias distributedshell):

kwargs:
type: dict, optional
argument path: resources[DistributedShell]/kwargs

This field is empty for this batch.

When batch_type is set to Fugaku (or its alias fugaku):

kwargs:
type: dict, optional
argument path: resources[Fugaku]/kwargs

This field is empty for this batch.

When batch_type is set to LSF (or its alias lsf):

kwargs:
type: dict
argument path: resources[LSF]/kwargs

Extra arguments.

gpu_usage:
type: bool, optional, default: False
argument path: resources[LSF]/kwargs/gpu_usage

Choosing if GPU is used in the calculation step.

gpu_new_syntax:
type: bool, optional, default: False
argument path: resources[LSF]/kwargs/gpu_new_syntax

For LFS >= 10.1.0.3, new option -gpu for #BSUB could be used. If False, and old syntax would be used.

gpu_exclusive:
type: bool, optional, default: True
argument path: resources[LSF]/kwargs/gpu_exclusive

Only take effect when new syntax enabled. Control whether submit tasks in exclusive way for GPU.

custom_gpu_line:
type: str | NoneType, optional, default: None
argument path: resources[LSF]/kwargs/custom_gpu_line

Custom GPU configuration, starting with #BSUB

When batch_type is set to Slurm (or its alias slurm):

kwargs:
type: dict, optional
argument path: resources[Slurm]/kwargs

Extra arguments.

custom_gpu_line:
type: str | NoneType, optional, default: None
argument path: resources[Slurm]/kwargs/custom_gpu_line

Custom GPU configuration, starting with #SBATCH

When batch_type is set to Shell (or its alias shell):

kwargs:
type: dict, optional
argument path: resources[Shell]/kwargs

This field is empty for this batch.

When batch_type is set to PBS (or its alias pbs):

kwargs:
type: dict, optional
argument path: resources[PBS]/kwargs

This field is empty for this batch.

When batch_type is set to SlurmJobArray (or its alias slurmjobarray):

kwargs:
type: dict, optional
argument path: resources[SlurmJobArray]/kwargs

Extra arguments.

custom_gpu_line:
type: str | NoneType, optional, default: None
argument path: resources[SlurmJobArray]/kwargs/custom_gpu_line

Custom GPU configuration, starting with #SBATCH

slurm_job_size:
type: int, optional, default: 1
argument path: resources[SlurmJobArray]/kwargs/slurm_job_size

Number of tasks in a Slurm job

When batch_type is set to Torque (or its alias torque):

kwargs:
type: dict, optional
argument path: resources[Torque]/kwargs

This field is empty for this batch.

When batch_type is set to OpenAPI (or its alias openapi):

kwargs:
type: dict, optional
argument path: resources[OpenAPI]/kwargs

This field is empty for this batch.

Task parameters

Note

One can load, modify, and export the input file by using our effective web-based tool DP-GUI online or hosted using the command line interface dpdisp gui. All parameters below can be set in DP-GUI. By clicking “SAVE JSON”, one can download the input file.

task:
type: dict
argument path: task
command:
type: str
argument path: task/command

A command to be executed of this task. The expected return code is 0.

task_work_path:
type: str
argument path: task/task_work_path

The dir where the command to be executed.

forward_files:
type: typing.List[str], optional, default: []
argument path: task/forward_files

The files to be uploaded in task_work_path before the task exectued.

backward_files:
type: typing.List[str], optional, default: []
argument path: task/backward_files

The files to be download to local_root in task_work_path after the task finished

outlog:
type: str | NoneType, optional, default: log
argument path: task/outlog

The out log file name. redirect from stdout

errlog:
type: str | NoneType, optional, default: err
argument path: task/errlog

The err log file name. redirect from stderr

Command line interface

dpdispatcher: Generate HPC scheduler systems jobs input scripts, submit these scripts to HPC systems, and poke until they finish

usage: dpdisp [-h] {submission,gui} ...

Valid subcommands

command

Possible choices: submission, gui

Sub-commands

submission

Handle terminated submission.

dpdisp submission [-h] [--download-terminated-log] [--download-finished-task]
                  [--clean]
                  SUBMISSION_HASH
Positional Arguments
SUBMISSION_HASH

Submission hash to download.

Actions

One or more actions to take on submission.

--download-terminated-log

Download log files of terminated tasks.

Default: False

--download-finished-task

Download finished tasks.

Default: False

--clean

Clean submission.

Default: False

gui

Serve DP-GUI.

dpdisp gui [-h] [-p PORT] [--bind_all]
Named Arguments
-p, --port

The port to serve DP-GUI on.

Default: 6042

--bind_all

Serve on all public interfaces. This will expose your DP-GUI instance to the network on both IPv4 and IPv6 (where available).

Default: False

DPDispatcher API

dpdispatcher package

class dpdispatcher.Job(job_task_list, *, resources, machine=None)[source]

Bases: object

Job is generated by Submission automatically. A job ususally has many tasks and it may request computing resources from job scheduler systems. Each Job can generate a script file to be submitted to the job scheduler system or executed locally.

Parameters:
job_task_listlist of Task

the tasks belonging to the job

resourcesResources

the machine resources. Passed from Submission when it constructs jobs.

machinemachine

machine object to execute the job. Passed from Submission when it constructs jobs.

Methods

deserialize(job_dict[, machine])

Convert the job_dict to a Submission class object.

get_job_state()

Get the jobs.

get_last_error_message()

Get last error message when the job is terminated.

serialize([if_static])

Convert the Task class instance to a dictionary.

get_hash

handle_unexpected_job_state

job_to_json

register_job_id

submit_job

classmethod deserialize(job_dict, machine=None)[source]

Convert the job_dict to a Submission class object.

Parameters:
job_dictdict

the dictionary which contains the job information

machineMachine

the machine object to execute the job

Returns:
submissionJob

the Job class instance converted from the job_dict

get_hash()[source]
get_job_state()[source]

Get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.

Notes

this method will not submit or resubmit the jobs if the job is unsubmitted.

get_last_error_message() str | None[source]

Get last error message when the job is terminated.

handle_unexpected_job_state()[source]
job_to_json()[source]
register_job_id(job_id)[source]
serialize(if_static=False)[source]

Convert the Task class instance to a dictionary.

Parameters:
if_staticbool

whether dump the job runtime infomation (job_id, job_state, fail_count, job_uuid etc.) to the dictionary.

Returns:
task_dictdict

the dictionary converted from the Task class instance

submit_job()[source]
class dpdispatcher.Machine(*args, **kwargs)[source]

Bases: object

A machine is used to handle the connection with remote machines.

Parameters:
contextSubClass derived from BaseContext

The context is used to mainatin the connection with remote machine.

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

alias: Tuple[str, ...] = ()
classmethod arginfo()[source]
bind_context(context)[source]
abstract check_finish_tag(**kwargs)[source]
check_if_recover(submission)[source]
abstract check_status(job)[source]
default_resources(res)[source]
classmethod deserialize(machine_dict)[source]
abstract do_submit(job)[source]

Submit a single job, assuming that no job is running there.

gen_command_env_cuda_devices(resources)[source]
gen_script(job)[source]
gen_script_command(job)[source]
gen_script_custom_flags_lines(job)[source]
gen_script_end(job)[source]
gen_script_env(job)[source]
abstract gen_script_header(job)[source]
gen_script_run_command(job)[source]
gen_script_wait(resources)[source]
get_exit_code(job)[source]

Get exit code of the job.

Parameters:
jobJob

job

kill(job)[source]

Kill the job.

If not implemented, pass and let the user manually kill it.

Parameters:
jobJob

job

classmethod load_from_dict(machine_dict)[source]
classmethod load_from_json(json_path)[source]
classmethod load_from_yaml(yaml_path)[source]
options = {'Bohrium', 'DistributedShell', 'Fugaku', 'LSF', 'OpenAPI', 'PBS', 'Shell', 'Slurm', 'SlurmJobArray', 'Torque'}
classmethod resources_arginfo() Argument[source]

Generate the resources arginfo.

Returns:
Argument

resources arginfo

classmethod resources_subfields() List[Argument][source]

Generate the resources subfields.

Returns:
list[Argument]

resources subfields

serialize(if_empty_remote_profile=False)[source]
sub_script_cmd(res)[source]
sub_script_head(res)[source]
subclasses_dict = {'Bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'DistributedShell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'DpCloudServer': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'Fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'LSF': <class 'dpdispatcher.machines.lsf.LSF'>, 'Lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'OpenAPI': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'PBS': <class 'dpdispatcher.machines.pbs.PBS'>, 'Shell': <class 'dpdispatcher.machines.shell.Shell'>, 'Slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'SlurmJobArray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'Torque': <class 'dpdispatcher.machines.pbs.Torque'>, 'bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'distributedshell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'dpcloudserver': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'lsf': <class 'dpdispatcher.machines.lsf.LSF'>, 'openapi': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'pbs': <class 'dpdispatcher.machines.pbs.PBS'>, 'shell': <class 'dpdispatcher.machines.shell.Shell'>, 'slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'slurmjobarray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'torque': <class 'dpdispatcher.machines.pbs.Torque'>}
class dpdispatcher.Resources(number_node, cpu_per_node, gpu_per_node, queue_name, group_size, *, custom_flags=[], strategy={'if_cuda_multi_devices': False, 'ratio_unfinished': 0.0}, para_deg=1, module_unload_list=[], module_purge=False, module_list=[], source_list=[], envs={}, prepend_script=[], append_script=[], wait_time=0, **kwargs)[source]

Bases: object

Resources is used to describe the machine resources we need to do calculations.

Parameters:
number_nodeint

The number of node need for each job.

cpu_per_nodeint

cpu numbers of each node.

gpu_per_nodeint

gpu numbers of each node.

queue_namestr

The queue name of batch job scheduler system.

group_sizeint

The number of tasks in a job.

custom_flagslist of Str

The extra lines pass to job submitting script header

strategydict

strategies we use to generation job submitting scripts. if_cuda_multi_devices : bool

If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS. If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task. Usually, this option will be used with Task.task_need_resources variable simultaneously.

ratio_unfinishedfloat

The ratio of task that can be unfinished.

customized_script_header_template_filestr

The customized template file to generate job submitting script header, which overrides the default file.

para_degint

Decide how many tasks will be run in parallel. Usually run with strategy[‘if_cuda_multi_devices’]

source_listlist of Path

The env file to be sourced before the command execution.

wait_timeint

The waitting time in second after a single task submitted. Default: 0.

Methods

arginfo

deserialize

load_from_dict

load_from_json

load_from_yaml

serialize

static arginfo(detail_kwargs=True)[source]
classmethod deserialize(resources_dict)[source]
classmethod load_from_dict(resources_dict)[source]
classmethod load_from_json(json_file)[source]
classmethod load_from_yaml(yaml_file)[source]
serialize()[source]
class dpdispatcher.Submission(work_base, machine=None, resources=None, forward_common_files=[], backward_common_files=[], *, task_list=[])[source]

Bases: object

A submission represents a collection of tasks. These tasks usually locate at a common directory. And these Tasks may share common files to be uploaded and downloaded.

Parameters:
work_basePath

the base directory of the local tasks. It is usually the dir name of project .

machineMachine

machine class object (for example, PBS, Slurm, Shell) to execute the jobs. The machine can still be bound after the instantiation with the bind_submission method.

resourcesResources

the machine resources (cpu or gpu) used to generate the slurm/pbs script

forward_common_fileslist

the common files to be uploaded to other computers before the jobs begin

backward_common_fileslist

the common files to be downloaded from other computers after the jobs finish

task_listlist of Task

a list of tasks to be run.

Methods

async_run_submission(**kwargs)

Async interface of run_submission.

bind_machine(machine)

Bind this submission to a machine.

check_all_finished()

Check whether all the jobs in the submission.

check_ratio_unfinished(ratio_unfinished)

Calculate the ratio of unfinished tasks in the submission.

deserialize(submission_dict[, machine])

Convert the submission_dict to a Submission class object.

generate_jobs()

After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs.

handle_unexpected_submission_state()

Handle unexpected job state of the submission.

run_submission(*[, dry_run, exit_on_submit, ...])

Main method to execute the submission.

serialize([if_static])

Convert the Submission class instance to a dictionary.

update_submission_state()

Check whether all the jobs in the submission.

clean_jobs

download_jobs

get_hash

register_task

register_task_list

remove_unfinished_tasks

submission_from_json

submission_to_json

try_download_result

try_recover_from_json

upload_jobs

async async_run_submission(**kwargs)[source]

Async interface of run_submission.

Examples

>>> import asyncio
>>> from dpdispacher import Machine, Resource, Submission
>>> async def run_jobs():
...     backgroud_task = set()
...     # task1
...     task1 = Task(...)
...     submission1 = Submission(..., task_list=[task1])
...     background_task = asyncio.create_task(
...         submission1.async_run_submission(check_interval=2, clean=False)
...     )
...     # task2
...     task2 = Task(...)
...     submission2 = Submission(..., task_list=[task1])
...     background_task = asyncio.create_task(
...         submission2.async_run_submission(check_interval=2, clean=False)
...     )
...     background_tasks.add(background_task)
...     result = await asyncio.gather(*background_tasks)
...     return result
>>> run_jobs()

May raise Error if pass clean=True explicitly when submit to pbs or slurm.

bind_machine(machine)[source]

Bind this submission to a machine. update the machine’s context remote_root and local_root.

Parameters:
machineMachine

the machine to bind with

check_all_finished()[source]

Check whether all the jobs in the submission.

Notes

This method will not handle unexpected job state in the submission.

check_ratio_unfinished(ratio_unfinished: float) bool[source]

Calculate the ratio of unfinished tasks in the submission.

Parameters:
ratio_unfinishedfloat

the ratio of unfinished tasks in the submission

Returns:
bool

whether the ratio of unfinished tasks in the submission is larger than ratio_unfinished

clean_jobs()[source]
classmethod deserialize(submission_dict, machine=None)[source]

Convert the submission_dict to a Submission class object.

Parameters:
submission_dictdict

path-like, the base directory of the local tasks

machineMachine

Machine class Object to execute the jobs

Returns:
submissionSubmission

the Submission class instance converted from the submission_dict

download_jobs()[source]
generate_jobs()[source]

After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs. The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task. Why we randomly shuffle the tasks is under the consideration of load balance. The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.

get_hash()[source]
handle_unexpected_submission_state()[source]

Handle unexpected job state of the submission. If the job state is unsubmitted, submit the job. If the job state is terminated (killed unexpectly), resubmit the job. If the job state is unknown, raise an error.

register_task(task)[source]
register_task_list(task_list)[source]
remove_unfinished_tasks()[source]
run_submission(*, dry_run=False, exit_on_submit=False, clean=True, check_interval=30)[source]

Main method to execute the submission. First, check whether old Submission exists on the remote machine, and try to recover from it. Second, upload the local files to the remote machine where the tasks to be executed. Third, run the submission defined previously. Forth, wait until the tasks in the submission finished and download the result file to local directory. If dry_run is True, submission will be uploaded but not be executed and exit. If exit_on_submit is True, submission will exit.

serialize(if_static=False)[source]

Convert the Submission class instance to a dictionary.

Parameters:
if_staticbool

whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.

Returns:
submission_dictdict

the dictionary converted from the Submission class instance

classmethod submission_from_json(json_file_name='submission.json')[source]
submission_to_json()[source]
try_download_result()[source]
try_recover_from_json()[source]
update_submission_state()[source]

Check whether all the jobs in the submission.

Notes

this method will not handle unexpected (like resubmit terminated) job state in the submission.

upload_jobs()[source]
class dpdispatcher.Task(command, task_work_path, forward_files=[], backward_files=[], outlog='log', errlog='err')[source]

Bases: object

A task is a sequential command to be executed, as well as the files it depends on to transmit forward and backward.

Parameters:
commandStr

the command to be executed.

task_work_pathPath

the directory of each file where the files are dependent on.

forward_fileslist of Path

the files to be transmitted to remote machine before the command execute.

backward_fileslist of Path

the files to be transmitted from remote machine after the comand finished.

outlogStr

the filename to which command redirect stdout

errlogStr

the filename to which command redirect stderr

Methods

deserialize(task_dict)

Convert the task_dict to a Task class object.

get_task_state(context)

Get the task state by checking the tag file.

arginfo

get_hash

load_from_dict

load_from_json

load_from_yaml

serialize

static arginfo()[source]
classmethod deserialize(task_dict)[source]

Convert the task_dict to a Task class object.

Parameters:
task_dictdict

the dictionary which contains the task information

Returns:
taskTask

the Task class instance converted from the task_dict

get_hash()[source]
get_task_state(context)[source]

Get the task state by checking the tag file.

Parameters:
contextContext

the context of the task

classmethod load_from_dict(task_dict: dict) Task[source]
classmethod load_from_json(json_file)[source]
classmethod load_from_yaml(yaml_file)[source]
serialize()[source]

Subpackages

dpdispatcher.contexts package

Contexts.

Submodules
dpdispatcher.contexts.dp_cloud_server_context module
class dpdispatcher.contexts.dp_cloud_server_context.BohriumContext(*args, **kwargs)[source]

Bases: BaseContext

Methods

machine_arginfo()

Generate the machine arginfo.

machine_subfields()

Generate the machine subfields.

bind_submission

check_file_exists

check_finish

check_home_file_exits

clean

download

load_from_dict

read_file

read_home_file

upload

upload_job

write_file

write_home_file

write_local_file

alias: Tuple[str, ...] = ('DpCloudServerContext', 'LebesgueContext')
bind_submission(submission)[source]
check_file_exists(fname)[source]
check_home_file_exits(fname)[source]
clean()[source]
download(submission)[source]
classmethod load_from_dict(context_dict)[source]
classmethod machine_subfields() List[Argument][source]

Generate the machine subfields.

Returns:
list[Argument]

machine subfields

read_file(fname)[source]
read_home_file(fname)[source]
upload(submission)[source]
upload_job(job, common_files=None)[source]
write_file(fname, write_str)[source]
write_home_file(fname, write_str)[source]
write_local_file(fname, write_str)[source]
dpdispatcher.contexts.dp_cloud_server_context.DpCloudServerContext

alias of BohriumContext

dpdispatcher.contexts.dp_cloud_server_context.LebesgueContext

alias of BohriumContext

dpdispatcher.contexts.hdfs_context module
class dpdispatcher.contexts.hdfs_context.HDFSContext(*args, **kwargs)[source]

Bases: BaseContext

Methods

check_file_exists(fname)

Check whether the given file exists, often used in checking whether the belonging job has finished.

download(submission[, check_exists, ...])

Download backward files from HDFS root dir.

machine_arginfo()

Generate the machine arginfo.

machine_subfields()

Generate the machine subfields.

upload(submission[, dereference])

Upload forward files and forward command files to HDFS root dir.

bind_submission

check_finish

clean

get_job_root

load_from_dict

read_file

write_file

bind_submission(submission)[source]
check_file_exists(fname)[source]

Check whether the given file exists, often used in checking whether the belonging job has finished.

Parameters:
fnamestring

file name to be checked

Returns:
status: boolean
clean()[source]
download(submission, check_exists=False, mark_failure=True, back_error=False)[source]

Download backward files from HDFS root dir.

Parameters:
submissionSubmission class instance

represents a collection of tasks, such as backward file names

check_existsbool

whether to check if the file exists

mark_failurebool

whether to mark the task as failed if the file does not exist

back_errorbool

whether to download error files

Returns:
none
get_job_root()[source]
classmethod load_from_dict(context_dict)[source]
read_file(fname)[source]
upload(submission, dereference=True)[source]

Upload forward files and forward command files to HDFS root dir.

Parameters:
submissionSubmission class instance

represents a collection of tasks, such as forward file names

dereferencebool

whether to dereference symbolic links

Returns:
none
write_file(fname, write_str)[source]
dpdispatcher.contexts.lazy_local_context module
class dpdispatcher.contexts.lazy_local_context.LazyLocalContext(*args, **kwargs)[source]

Bases: BaseContext

Run jobs in the local server and local directory.

Parameters:
local_rootstr

The local directory to store the jobs.

remote_rootstr, optional

The argument takes no effect.

remote_profiledict, optional

The remote profile. The default is {}.

*args

The arguments.

**kwargs

The keyword arguments.

Methods

machine_arginfo()

Generate the machine arginfo.

machine_subfields()

Generate the machine subfields.

bind_submission

block_call

block_checkcall

call

check_file_exists

check_finish

clean

download

get_job_root

get_return

load_from_dict

read_file

upload

write_file

bind_submission(submission)[source]
block_call(cmd)[source]
block_checkcall(cmd)[source]
call(cmd)[source]
check_file_exists(fname)[source]
check_finish(proc)[source]
clean()[source]
download(jobs, check_exists=False, mark_failure=True, back_error=False)[source]
get_job_root()[source]
get_return(proc)[source]
classmethod load_from_dict(context_dict)[source]
read_file(fname)[source]
upload(jobs, dereference=True)[source]
write_file(fname, write_str)[source]
class dpdispatcher.contexts.lazy_local_context.SPRetObj(ret)[source]

Bases: object

Methods

read

readlines

read()[source]
readlines()[source]
dpdispatcher.contexts.local_context module
class dpdispatcher.contexts.local_context.LocalContext(*args, **kwargs)[source]

Bases: BaseContext

Run jobs in the local server and remote directory.

Parameters:
local_rootstr

The local directory to store the jobs.

remote_rootstr

The remote directory to store the jobs.

remote_profiledict, optional

The remote profile. The default is {}.

*args

The arguments.

**kwargs

The keyword arguments.

Methods

machine_arginfo()

Generate the machine arginfo.

machine_subfields()

Generate the machine subfields.

bind_submission

block_call

block_checkcall

call

check_file_exists

check_finish

clean

download

get_job_root

get_return

load_from_dict

read_file

upload

write_file

bind_submission(submission)[source]
block_call(cmd)[source]
block_checkcall(cmd)[source]
call(cmd)[source]
check_file_exists(fname)[source]
check_finish(proc)[source]
clean()[source]
download(submission, check_exists=False, mark_failure=True, back_error=False)[source]
get_job_root()[source]
get_return(proc)[source]
classmethod load_from_dict(context_dict)[source]
read_file(fname)[source]
upload(submission)[source]
write_file(fname, write_str)[source]
class dpdispatcher.contexts.local_context.SPRetObj(ret)[source]

Bases: object

Methods

read

readlines

read()[source]
readlines()[source]
dpdispatcher.contexts.openapi_context module
class dpdispatcher.contexts.openapi_context.OpenAPIContext(*args, **kwargs)[source]

Bases: BaseContext

Methods

machine_arginfo()

Generate the machine arginfo.

machine_subfields()

Generate the machine subfields.

bind_submission

check_file_exists

check_finish

check_home_file_exits

clean

download

load_from_dict

read_file

read_home_file

upload

upload_job

write_file

write_home_file

write_local_file

bind_submission(submission)[source]
check_file_exists(fname)[source]
check_home_file_exits(fname)[source]
clean()[source]
download(submission)[source]
classmethod load_from_dict(context_dict)[source]
read_file(fname)[source]
read_home_file(fname)[source]
upload(submission)[source]
upload_job(job, common_files=None)[source]
write_file(fname, write_str)[source]
write_home_file(fname, write_str)[source]
write_local_file(fname, write_str)[source]
dpdispatcher.contexts.ssh_context module
class dpdispatcher.contexts.ssh_context.SSHContext(*args, **kwargs)[source]

Bases: BaseContext

Attributes:
sftp
ssh

Methods

block_checkcall(cmd[, asynchronously, ...])

Run command with arguments.

machine_arginfo()

Generate the machine arginfo.

machine_subfields()

Generate the machine subfields.

bind_submission

block_call

call

check_file_exists

check_finish

clean

close

download

get_job_root

get_return

list_remote_dir

load_from_dict

read_file

upload

write_file

bind_submission(submission)[source]
block_call(cmd)[source]
block_checkcall(cmd, asynchronously=False, stderr_whitelist=None)[source]

Run command with arguments. Wait for command to complete. If the return code was zero then return, otherwise raise RuntimeError.

Parameters:
cmdstr

The command to run.

asynchronouslybool, optional, default=False

Run command asynchronously. If True, nohup will be used to run the command.

stderr_whitelistlist of str, optional, default=None

If not None, the stderr will be checked against the whitelist. If the stderr contains any of the strings in the whitelist, the command will be considered successful.

call(cmd)[source]
check_file_exists(fname)[source]
check_finish(cmd_pipes)[source]
clean()[source]
close()[source]
download(submission, check_exists=False, mark_failure=True, back_error=False)[source]
get_job_root()[source]
get_return(cmd_pipes)[source]
list_remote_dir(sftp, remote_dir, ref_remote_root, result_list)[source]
classmethod load_from_dict(context_dict)[source]
classmethod machine_subfields() List[Argument][source]

Generate the machine subfields.

Returns:
list[Argument]

machine subfields

read_file(fname)[source]
property sftp
property ssh
upload(submission, dereference=True)[source]
write_file(fname, write_str)[source]
class dpdispatcher.contexts.ssh_context.SSHSession(hostname, username, password=None, port=22, key_filename=None, passphrase=None, timeout=10, totp_secret=None, tar_compress=True, look_for_keys=True)[source]

Bases: object

Attributes:
remote
rsync_available
sftp

Returns sftp.

Methods

inter_handler(title, instructions, prompt_list)

inter_handler: the callback for paramiko.transport.auth_interactive.

arginfo

close

ensure_alive

exec_command

get

get_ssh_client

put

static arginfo()[source]
close()[source]
ensure_alive(max_check=10, sleep_time=10)[source]
exec_command(**kwargs)
get(from_f, to_f)[source]
get_ssh_client()[source]
inter_handler(title, instructions, prompt_list)[source]

inter_handler: the callback for paramiko.transport.auth_interactive.

The prototype for this function is defined by Paramiko, so all of the arguments need to be there, even though we don’t use ‘title’ or ‘instructions’.

The function is expected to return a tuple of data containing the responses to the provided prompts. Experimental results suggests that there will be one call of this function per prompt, but the mechanism allows for multiple prompts to be sent at once, so it’s best to assume that that can happen.

Since tuples can’t really be built on the fly, the responses are collected in a list which is then converted to a tuple when it’s time to return a value.

Experiments suggest that the username prompt never happens. This makes sense, but the Username prompt is included here just in case.

put(from_f, to_f)[source]
property remote: str
property rsync_available: bool
property sftp

Returns sftp. Open a new one if not existing.

dpdispatcher.entrypoints package

Entry points.

Submodules
dpdispatcher.entrypoints.gui module

DP-GUI entrypoint.

dpdispatcher.entrypoints.gui.start_dpgui(*, port: int, bind_all: bool, **kwargs)[source]

Host DP-GUI server.

Parameters:
portint

The port to serve DP-GUI on.

bind_allbool

Serve on all public interfaces. This will expose your DP-GUI instance to the network on both IPv4 and IPv6 (where available).

**kwargs

additional arguments

Raises:
ModuleNotFoundError

The dpgui package is not installed

dpdispatcher.entrypoints.submission module
dpdispatcher.entrypoints.submission.handle_submission(*, submission_hash: str, download_terminated_log: bool = False, download_finished_task: bool = False, clean: bool = False)[source]

Handle terminated submission.

Parameters:
submission_hashstr

Submission hash to download.

download_terminated_logbool, optional

Download log files of terminated tasks.

download_finished_taskbool, optional

Download finished tasks.

cleanbool, optional

Clean submission.

Raises:
ValueError

At least one action should be specified.

dpdispatcher.machines package

Machines.

Submodules
dpdispatcher.machines.distributed_shell module
class dpdispatcher.machines.distributed_shell.DistributedShell(*args, **kwargs)[source]

Bases: Machine

Methods

do_submit(job)

Submit th job to yarn using distributed shell.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

check_finish_tag(job)[source]
check_status(job)[source]
do_submit(job)[source]

Submit th job to yarn using distributed shell.

Parameters:
jobJob class instance

job to be submitted

Returns:
job_id: string

submit process id

gen_script_end(job)[source]
gen_script_env(job)[source]
gen_script_header(job)[source]
dpdispatcher.machines.dp_cloud_server module
class dpdispatcher.machines.dp_cloud_server.Bohrium(*args, **kwargs)[source]

Bases: Machine

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_local_script

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

map_dp_job_state

serialize

sub_script_cmd

sub_script_head

alias: Tuple[str, ...] = ('Lebesgue', 'DpCloudServer')
check_finish_tag(job)[source]
check_if_recover(submission)[source]
check_status(job)[source]
do_submit(job)[source]

Submit a single job, assuming that no job is running there.

gen_local_script(job)[source]
gen_script(job)[source]
gen_script_header(job)[source]
get_exit_code(job) int[source]

Get exit code of the job.

Parameters:
jobJob

job

kill(job)[source]

Kill the job.

Parameters:
jobJob

job

static map_dp_job_state(status, exit_code, ignore_exit_code=True)[source]
dpdispatcher.machines.dp_cloud_server.DpCloudServer

alias of Bohrium

dpdispatcher.machines.dp_cloud_server.Lebesgue

alias of Bohrium

dpdispatcher.machines.fugaku module
class dpdispatcher.machines.fugaku.Fugaku(*args, **kwargs)[source]

Bases: Machine

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

check_finish_tag(job)[source]
check_status(job)[source]
default_resources(resources)[source]
do_submit(job)[source]

Submit a single job, assuming that no job is running there.

gen_script(job)[source]
gen_script_header(job)[source]
dpdispatcher.machines.lsf module
class dpdispatcher.machines.lsf.LSF(*args, **kwargs)[source]

Bases: Machine

LSF batch.

Methods

default_resources(resources)

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

deserialize

do_submit

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

check_finish_tag(job)[source]
check_status(**kwargs)
default_resources(resources)[source]
do_submit(**kwargs)

Submit a single job, assuming that no job is running there.

gen_script(job)[source]
gen_script_header(job)[source]
kill(job)[source]

Kill the job.

Parameters:
jobJob

job

classmethod resources_subfields() List[Argument][source]

Generate the resources subfields.

Returns:
list[Argument]

resources subfields

sub_script_cmd(res)[source]
sub_script_head(res)[source]
dpdispatcher.machines.openapi module
class dpdispatcher.machines.openapi.OpenAPI(*args, **kwargs)[source]

Bases: Machine

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_local_script

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

map_dp_job_state

serialize

sub_script_cmd

sub_script_head

check_finish_tag(job)[source]
check_if_recover(submission)[source]
check_status(job)[source]
do_submit(job)[source]

Submit a single job, assuming that no job is running there.

gen_local_script(job)[source]
gen_script(job)[source]
gen_script_header(job)[source]
get_exit_code(job)[source]

Get exit code of the job.

Parameters:
jobJob

job

Returns:
int

exit code

kill(job)[source]

Kill the job.

Parameters:
jobJob

job

static map_dp_job_state(status, exit_code, ignore_exit_code=True)[source]
dpdispatcher.machines.pbs module
class dpdispatcher.machines.pbs.PBS(*args, **kwargs)[source]

Bases: Machine

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

check_finish_tag(job)[source]
check_status(job)[source]
default_resources(resources)[source]
do_submit(job)[source]

Submit a single job, assuming that no job is running there.

gen_script(job)[source]
gen_script_header(job)[source]
kill(job)[source]

Kill the job.

Parameters:
jobJob

job

class dpdispatcher.machines.pbs.Torque(*args, **kwargs)[source]

Bases: PBS

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

check_status(job)[source]
gen_script_header(job)[source]
dpdispatcher.machines.shell module
class dpdispatcher.machines.shell.Shell(*args, **kwargs)[source]

Bases: Machine

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

check_finish_tag(job)[source]
check_status(job)[source]
default_resources(resources)[source]
do_submit(job)[source]

Submit a single job, assuming that no job is running there.

gen_script(job)[source]
gen_script_header(job)[source]
kill(job)[source]

Kill the job.

Parameters:
jobJob

job

dpdispatcher.machines.slurm module
class dpdispatcher.machines.slurm.Slurm(*args, **kwargs)[source]

Bases: Machine

Methods

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

do_submit

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

check_finish_tag(job)[source]
check_status(**kwargs)
default_resources(resources)[source]
do_submit(**kwargs)

Submit a single job, assuming that no job is running there.

gen_script(job)[source]
gen_script_header(job)[source]
kill(job)[source]

Kill the job.

Parameters:
jobJob

job

classmethod resources_subfields() List[Argument][source]

Generate the resources subfields.

Returns:
list[Argument]

resources subfields

class dpdispatcher.machines.slurm.SlurmJobArray(*args, **kwargs)[source]

Bases: Slurm

Slurm with job array enabled for multiple tasks in a job.

Methods

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

do_submit

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

check_finish_tag(job)[source]
check_status(**kwargs)
gen_script_command(job)[source]
gen_script_end(job)[source]
gen_script_header(job)[source]
classmethod resources_subfields() List[Argument][source]

Generate the resources subfields.

Returns:
list[Argument]

resources subfields

dpdispatcher.utils package

Utils.

Subpackages
dpdispatcher.utils.dpcloudserver package
class dpdispatcher.utils.dpcloudserver.Client(email=None, password=None, debug=False, ticket=None, base_url='https://bohrium.dp.tech/')[source]

Bases: object

Methods

download

download_from_url

get

get_job_detail

get_job_result_url

get_log

get_tasks_list

job_create

kill

post

refresh_token

upload

download(oss_file, save_file, endpoint, bucket_name)[source]
download_from_url(url, save_file)[source]
get(url, header=None, params=None, retry=5)[source]
get_job_detail(job_id)[source]
get_job_result_url(job_id)[source]
get_log(job_id)[source]
get_tasks_list(group_id, per_page=30)[source]
job_create(job_type, oss_path, input_data, program_id=None, group_id=None)[source]
kill(job_id)[source]
post(url, data=None, header=None, params=None, retry=5)[source]
refresh_token(retry=3)[source]
upload(oss_task_zip, zip_task_file, endpoint, bucket_name)[source]
Submodules
dpdispatcher.utils.dpcloudserver.client module
class dpdispatcher.utils.dpcloudserver.client.Client(email=None, password=None, debug=False, ticket=None, base_url='https://bohrium.dp.tech/')[source]

Bases: object

Methods

download

download_from_url

get

get_job_detail

get_job_result_url

get_log

get_tasks_list

job_create

kill

post

refresh_token

upload

download(oss_file, save_file, endpoint, bucket_name)[source]
download_from_url(url, save_file)[source]
get(url, header=None, params=None, retry=5)[source]
get_job_detail(job_id)[source]
get_job_result_url(job_id)[source]
get_log(job_id)[source]
get_tasks_list(group_id, per_page=30)[source]
job_create(job_type, oss_path, input_data, program_id=None, group_id=None)[source]
kill(job_id)[source]
post(url, data=None, header=None, params=None, retry=5)[source]
refresh_token(retry=3)[source]
upload(oss_task_zip, zip_task_file, endpoint, bucket_name)[source]
exception dpdispatcher.utils.dpcloudserver.client.RequestInfoException[source]

Bases: Exception

dpdispatcher.utils.dpcloudserver.config module
dpdispatcher.utils.dpcloudserver.retcode module
class dpdispatcher.utils.dpcloudserver.retcode.RETCODE[source]

Bases: object

DATAERR = '2002'
DBERR = '2000'
IOERR = '2003'
NODATA = '2300'
OK = '0000'
PARAMERR = '2101'
PWDERR = '2104'
REQERR = '2200'
ROLEERR = '2103'
THIRDERR = '2001'
TOKENINVALID = '2100'
UNDERDEBUG = '2301'
UNKOWNERR = '2400'
USERERR = '2102'
VERIFYERR = '2105'
dpdispatcher.utils.dpcloudserver.zip_file module
dpdispatcher.utils.dpcloudserver.zip_file.unzip_file(zip_file, out_dir='./')[source]
dpdispatcher.utils.dpcloudserver.zip_file.zip_file_list(root_path, zip_filename, file_list=[])[source]
Submodules
dpdispatcher.utils.hdfs_cli module
class dpdispatcher.utils.hdfs_cli.HDFS[source]

Bases: object

Fundamental class for HDFS basic manipulation.

Methods

copy_from_local(local_path, to_uri)

Returns: True on success Raises: on unexpected error.

exists(uri)

Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.

mkdir(uri)

Make new hdfs directory Returns: True on success Raises: RuntimeError.

remove(uri)

Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.

copy_to_local

move

read_hdfs_file

static copy_from_local(local_path, to_uri)[source]

Returns: True on success Raises: on unexpected error.

static copy_to_local(from_uri, local_path)[source]
static exists(uri)[source]

Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.

static mkdir(uri)[source]

Make new hdfs directory Returns: True on success Raises: RuntimeError.

static move(from_uri, to_uri)[source]
static read_hdfs_file(uri)[source]
static remove(uri)[source]

Check existence of hdfs uri Returns: True on exists Raises: RuntimeError.

dpdispatcher.utils.job_status module
class dpdispatcher.utils.job_status.JobStatus(value)[source]

Bases: IntEnum

An enumeration.

completing = 6
finished = 5
running = 3
terminated = 4
unknown = 100
unsubmitted = 1
waiting = 2
dpdispatcher.utils.record module
dpdispatcher.utils.utils module
exception dpdispatcher.utils.utils.RetrySignal[source]

Bases: Exception

Exception to give a signal to retry the function.

dpdispatcher.utils.utils.customized_script_header_template(filename: PathLike, resources: Resources) str[source]
dpdispatcher.utils.utils.generate_totp(secret: str, period: int = 30, token_length: int = 6) str[source]

Generate time-based one time password (TOTP) from the secret.

Some HPCs use TOTP for two-factor authentication for safety.

Parameters:
secretstr

The encoded secret provided by the HPC. It’s usually extracted from a 2D code and base32 encoded.

periodint, default=30

Time period where the code is valid in seconds.

token_lengthint, default=6

The token length.

Returns:
token: str

The generated token.

References

https://github.com/lepture/otpauth/blob/49914d83d36dbcd33c9e26f65002b21ce09a6303/otpauth.py#L143-L160

dpdispatcher.utils.utils.get_sha256(filename)[source]

Get sha256 of a file.

Parameters:
filenamestr

The filename.

Returns:
sha256: str

The sha256.

dpdispatcher.utils.utils.hotp(key: str, period: int, token_length: int = 6, digest='sha1')[source]
dpdispatcher.utils.utils.retry(max_retry: int = 3, sleep: int | float = 60, catch_exception: ~typing.Type[BaseException] = <class 'dpdispatcher.utils.utils.RetrySignal'>) Callable[source]

Retry the function until it succeeds or fails for certain times.

Parameters:
max_retryint, default=3

The maximum retry times. If None, it will retry forever.

sleepint or float, default=60

The sleep time in seconds.

catch_exceptionException, default=Exception

The exception to catch.

Returns:
decorator: Callable

The decorator.

Examples

>>> @retry(max_retry=3, sleep=60, catch_exception=RetrySignal)
... def func():
...     raise RetrySignal("Failed")
dpdispatcher.utils.utils.rsync(from_file: str, to_file: str, port: int = 22, key_filename: str | None = None, timeout: int | float = 10)[source]

Call rsync to transfer files.

Parameters:
from_filestr

SRC

to_filestr

DEST

portint, default=22

port for ssh

key_filenamestr, optional

identity file name

timeoutint, default=10

timeout for ssh

Raises:
RuntimeError

when return code is not 0

dpdispatcher.utils.utils.run_cmd_with_all_output(cmd, shell=True)[source]

Submodules

dpdispatcher.arginfo module

dpdispatcher.base_context module

class dpdispatcher.base_context.BaseContext(*args, **kwargs)[source]

Bases: object

Methods

machine_arginfo()

Generate the machine arginfo.

machine_subfields()

Generate the machine subfields.

bind_submission

check_finish

clean

download

load_from_dict

read_file

upload

write_file

alias: Tuple[str, ...] = ()
bind_submission(submission)[source]
check_finish(proc)[source]
abstract clean()[source]
abstract download(submission, check_exists=False, mark_failure=True, back_error=False)[source]
classmethod load_from_dict(context_dict)[source]
classmethod machine_arginfo() Argument[source]

Generate the machine arginfo.

Returns:
Argument

machine arginfo

classmethod machine_subfields() List[Argument][source]

Generate the machine subfields.

Returns:
list[Argument]

machine subfields

options = {'BohriumContext', 'HDFSContext', 'LazyLocalContext', 'LocalContext', 'OpenAPIContext', 'SSHContext'}
abstract read_file(fname)[source]
subclasses_dict = {'Bohrium': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'BohriumContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'DpCloudServer': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'DpCloudServerContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'HDFS': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'HDFSContext': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'LazyLocal': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'LazyLocalContext': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'Lebesgue': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'LebesgueContext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'Local': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'LocalContext': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'OpenAPI': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'OpenAPIContext': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'SSH': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'SSHContext': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'bohrium': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'bohriumcontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'dpcloudserver': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'dpcloudservercontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'hdfs': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'hdfscontext': <class 'dpdispatcher.contexts.hdfs_context.HDFSContext'>, 'lazylocal': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'lazylocalcontext': <class 'dpdispatcher.contexts.lazy_local_context.LazyLocalContext'>, 'lebesgue': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'lebesguecontext': <class 'dpdispatcher.contexts.dp_cloud_server_context.BohriumContext'>, 'local': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'localcontext': <class 'dpdispatcher.contexts.local_context.LocalContext'>, 'openapi': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'openapicontext': <class 'dpdispatcher.contexts.openapi_context.OpenAPIContext'>, 'ssh': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>, 'sshcontext': <class 'dpdispatcher.contexts.ssh_context.SSHContext'>}
abstract upload(submission)[source]
abstract write_file(fname, write_str)[source]

dpdispatcher.dlog module

dpdispatcher.dpdisp module

dpdispatcher.dpdisp.main()[source]
dpdispatcher.dpdisp.main_parser() ArgumentParser[source]

Dpdispatcher commandline options argument parser.

Returns:
argparse.ArgumentParser

the argument parser

Notes

This function is used by documentation.

dpdispatcher.dpdisp.parse_args(args: List[str] | None = None)[source]

Dpdispatcher commandline options argument parsing.

Parameters:
argsList[str]

list of command line arguments, main purpose is testing default option None takes arguments from sys.argv

dpdispatcher.machine module

class dpdispatcher.machine.Machine(*args, **kwargs)[source]

Bases: object

A machine is used to handle the connection with remote machines.

Parameters:
contextSubClass derived from BaseContext

The context is used to mainatin the connection with remote machine.

Methods

do_submit(job)

Submit a single job, assuming that no job is running there.

get_exit_code(job)

Get exit code of the job.

kill(job)

Kill the job.

resources_arginfo()

Generate the resources arginfo.

resources_subfields()

Generate the resources subfields.

arginfo

bind_context

check_finish_tag

check_if_recover

check_status

default_resources

deserialize

gen_command_env_cuda_devices

gen_script

gen_script_command

gen_script_custom_flags_lines

gen_script_end

gen_script_env

gen_script_header

gen_script_run_command

gen_script_wait

load_from_dict

load_from_json

load_from_yaml

serialize

sub_script_cmd

sub_script_head

alias: Tuple[str, ...] = ()
classmethod arginfo()[source]
bind_context(context)[source]
abstract check_finish_tag(**kwargs)[source]
check_if_recover(submission)[source]
abstract check_status(job)[source]
default_resources(res)[source]
classmethod deserialize(machine_dict)[source]
abstract do_submit(job)[source]

Submit a single job, assuming that no job is running there.

gen_command_env_cuda_devices(resources)[source]
gen_script(job)[source]
gen_script_command(job)[source]
gen_script_custom_flags_lines(job)[source]
gen_script_end(job)[source]
gen_script_env(job)[source]
abstract gen_script_header(job)[source]
gen_script_run_command(job)[source]
gen_script_wait(resources)[source]
get_exit_code(job)[source]

Get exit code of the job.

Parameters:
jobJob

job

kill(job)[source]

Kill the job.

If not implemented, pass and let the user manually kill it.

Parameters:
jobJob

job

classmethod load_from_dict(machine_dict)[source]
classmethod load_from_json(json_path)[source]
classmethod load_from_yaml(yaml_path)[source]
options = {'Bohrium', 'DistributedShell', 'Fugaku', 'LSF', 'OpenAPI', 'PBS', 'Shell', 'Slurm', 'SlurmJobArray', 'Torque'}
classmethod resources_arginfo() Argument[source]

Generate the resources arginfo.

Returns:
Argument

resources arginfo

classmethod resources_subfields() List[Argument][source]

Generate the resources subfields.

Returns:
list[Argument]

resources subfields

serialize(if_empty_remote_profile=False)[source]
sub_script_cmd(res)[source]
sub_script_head(res)[source]
subclasses_dict = {'Bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'DistributedShell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'DpCloudServer': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'Fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'LSF': <class 'dpdispatcher.machines.lsf.LSF'>, 'Lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'OpenAPI': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'PBS': <class 'dpdispatcher.machines.pbs.PBS'>, 'Shell': <class 'dpdispatcher.machines.shell.Shell'>, 'Slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'SlurmJobArray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'Torque': <class 'dpdispatcher.machines.pbs.Torque'>, 'bohrium': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'distributedshell': <class 'dpdispatcher.machines.distributed_shell.DistributedShell'>, 'dpcloudserver': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'fugaku': <class 'dpdispatcher.machines.fugaku.Fugaku'>, 'lebesgue': <class 'dpdispatcher.machines.dp_cloud_server.Bohrium'>, 'lsf': <class 'dpdispatcher.machines.lsf.LSF'>, 'openapi': <class 'dpdispatcher.machines.openapi.OpenAPI'>, 'pbs': <class 'dpdispatcher.machines.pbs.PBS'>, 'shell': <class 'dpdispatcher.machines.shell.Shell'>, 'slurm': <class 'dpdispatcher.machines.slurm.Slurm'>, 'slurmjobarray': <class 'dpdispatcher.machines.slurm.SlurmJobArray'>, 'torque': <class 'dpdispatcher.machines.pbs.Torque'>}

dpdispatcher.submission module

class dpdispatcher.submission.Job(job_task_list, *, resources, machine=None)[source]

Bases: object

Job is generated by Submission automatically. A job ususally has many tasks and it may request computing resources from job scheduler systems. Each Job can generate a script file to be submitted to the job scheduler system or executed locally.

Parameters:
job_task_listlist of Task

the tasks belonging to the job

resourcesResources

the machine resources. Passed from Submission when it constructs jobs.

machinemachine

machine object to execute the job. Passed from Submission when it constructs jobs.

Methods

deserialize(job_dict[, machine])

Convert the job_dict to a Submission class object.

get_job_state()

Get the jobs.

get_last_error_message()

Get last error message when the job is terminated.

serialize([if_static])

Convert the Task class instance to a dictionary.

get_hash

handle_unexpected_job_state

job_to_json

register_job_id

submit_job

classmethod deserialize(job_dict, machine=None)[source]

Convert the job_dict to a Submission class object.

Parameters:
job_dictdict

the dictionary which contains the job information

machineMachine

the machine object to execute the job

Returns:
submissionJob

the Job class instance converted from the job_dict

get_hash()[source]
get_job_state()[source]

Get the jobs. Usually, this method will query the database of slurm or pbs job scheduler system and get the results.

Notes

this method will not submit or resubmit the jobs if the job is unsubmitted.

get_last_error_message() str | None[source]

Get last error message when the job is terminated.

handle_unexpected_job_state()[source]
job_to_json()[source]
register_job_id(job_id)[source]
serialize(if_static=False)[source]

Convert the Task class instance to a dictionary.

Parameters:
if_staticbool

whether dump the job runtime infomation (job_id, job_state, fail_count, job_uuid etc.) to the dictionary.

Returns:
task_dictdict

the dictionary converted from the Task class instance

submit_job()[source]
class dpdispatcher.submission.Resources(number_node, cpu_per_node, gpu_per_node, queue_name, group_size, *, custom_flags=[], strategy={'if_cuda_multi_devices': False, 'ratio_unfinished': 0.0}, para_deg=1, module_unload_list=[], module_purge=False, module_list=[], source_list=[], envs={}, prepend_script=[], append_script=[], wait_time=0, **kwargs)[source]

Bases: object

Resources is used to describe the machine resources we need to do calculations.

Parameters:
number_nodeint

The number of node need for each job.

cpu_per_nodeint

cpu numbers of each node.

gpu_per_nodeint

gpu numbers of each node.

queue_namestr

The queue name of batch job scheduler system.

group_sizeint

The number of tasks in a job.

custom_flagslist of Str

The extra lines pass to job submitting script header

strategydict

strategies we use to generation job submitting scripts. if_cuda_multi_devices : bool

If there are multiple nvidia GPUS on the node, and we want to assign the tasks to different GPUS. If true, dpdispatcher will manually export environment variable CUDA_VISIBLE_DEVICES to different task. Usually, this option will be used with Task.task_need_resources variable simultaneously.

ratio_unfinishedfloat

The ratio of task that can be unfinished.

customized_script_header_template_filestr

The customized template file to generate job submitting script header, which overrides the default file.

para_degint

Decide how many tasks will be run in parallel. Usually run with strategy[‘if_cuda_multi_devices’]

source_listlist of Path

The env file to be sourced before the command execution.

wait_timeint

The waitting time in second after a single task submitted. Default: 0.

Methods

arginfo

deserialize

load_from_dict

load_from_json

load_from_yaml

serialize

static arginfo(detail_kwargs=True)[source]
classmethod deserialize(resources_dict)[source]
classmethod load_from_dict(resources_dict)[source]
classmethod load_from_json(json_file)[source]
classmethod load_from_yaml(yaml_file)[source]
serialize()[source]
class dpdispatcher.submission.Submission(work_base, machine=None, resources=None, forward_common_files=[], backward_common_files=[], *, task_list=[])[source]

Bases: object

A submission represents a collection of tasks. These tasks usually locate at a common directory. And these Tasks may share common files to be uploaded and downloaded.

Parameters:
work_basePath

the base directory of the local tasks. It is usually the dir name of project .

machineMachine

machine class object (for example, PBS, Slurm, Shell) to execute the jobs. The machine can still be bound after the instantiation with the bind_submission method.

resourcesResources

the machine resources (cpu or gpu) used to generate the slurm/pbs script

forward_common_fileslist

the common files to be uploaded to other computers before the jobs begin

backward_common_fileslist

the common files to be downloaded from other computers after the jobs finish

task_listlist of Task

a list of tasks to be run.

Methods

async_run_submission(**kwargs)

Async interface of run_submission.

bind_machine(machine)

Bind this submission to a machine.

check_all_finished()

Check whether all the jobs in the submission.

check_ratio_unfinished(ratio_unfinished)

Calculate the ratio of unfinished tasks in the submission.

deserialize(submission_dict[, machine])

Convert the submission_dict to a Submission class object.

generate_jobs()

After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs.

handle_unexpected_submission_state()

Handle unexpected job state of the submission.

run_submission(*[, dry_run, exit_on_submit, ...])

Main method to execute the submission.

serialize([if_static])

Convert the Submission class instance to a dictionary.

update_submission_state()

Check whether all the jobs in the submission.

clean_jobs

download_jobs

get_hash

register_task

register_task_list

remove_unfinished_tasks

submission_from_json

submission_to_json

try_download_result

try_recover_from_json

upload_jobs

async async_run_submission(**kwargs)[source]

Async interface of run_submission.

Examples

>>> import asyncio
>>> from dpdispacher import Machine, Resource, Submission
>>> async def run_jobs():
...     backgroud_task = set()
...     # task1
...     task1 = Task(...)
...     submission1 = Submission(..., task_list=[task1])
...     background_task = asyncio.create_task(
...         submission1.async_run_submission(check_interval=2, clean=False)
...     )
...     # task2
...     task2 = Task(...)
...     submission2 = Submission(..., task_list=[task1])
...     background_task = asyncio.create_task(
...         submission2.async_run_submission(check_interval=2, clean=False)
...     )
...     background_tasks.add(background_task)
...     result = await asyncio.gather(*background_tasks)
...     return result
>>> run_jobs()

May raise Error if pass clean=True explicitly when submit to pbs or slurm.

bind_machine(machine)[source]

Bind this submission to a machine. update the machine’s context remote_root and local_root.

Parameters:
machineMachine

the machine to bind with

check_all_finished()[source]

Check whether all the jobs in the submission.

Notes

This method will not handle unexpected job state in the submission.

check_ratio_unfinished(ratio_unfinished: float) bool[source]

Calculate the ratio of unfinished tasks in the submission.

Parameters:
ratio_unfinishedfloat

the ratio of unfinished tasks in the submission

Returns:
bool

whether the ratio of unfinished tasks in the submission is larger than ratio_unfinished

clean_jobs()[source]
classmethod deserialize(submission_dict, machine=None)[source]

Convert the submission_dict to a Submission class object.

Parameters:
submission_dictdict

path-like, the base directory of the local tasks

machineMachine

Machine class Object to execute the jobs

Returns:
submissionSubmission

the Submission class instance converted from the submission_dict

download_jobs()[source]
generate_jobs()[source]

After tasks register to the self.belonging_tasks, This method generate the jobs and add these jobs to self.belonging_jobs. The jobs are generated by the tasks randomly, and there are self.resources.group_size tasks in a task. Why we randomly shuffle the tasks is under the consideration of load balance. The random seed is a constant (to be concrete, 42). And this insures that the jobs are equal when we re-run the program.

get_hash()[source]
handle_unexpected_submission_state()[source]

Handle unexpected job state of the submission. If the job state is unsubmitted, submit the job. If the job state is terminated (killed unexpectly), resubmit the job. If the job state is unknown, raise an error.

register_task(task)[source]
register_task_list(task_list)[source]
remove_unfinished_tasks()[source]
run_submission(*, dry_run=False, exit_on_submit=False, clean=True, check_interval=30)[source]

Main method to execute the submission. First, check whether old Submission exists on the remote machine, and try to recover from it. Second, upload the local files to the remote machine where the tasks to be executed. Third, run the submission defined previously. Forth, wait until the tasks in the submission finished and download the result file to local directory. If dry_run is True, submission will be uploaded but not be executed and exit. If exit_on_submit is True, submission will exit.

serialize(if_static=False)[source]

Convert the Submission class instance to a dictionary.

Parameters:
if_staticbool

whether dump the job runtime infomation (like job_id, job_state, fail_count) to the dictionary.

Returns:
submission_dictdict

the dictionary converted from the Submission class instance

classmethod submission_from_json(json_file_name='submission.json')[source]
submission_to_json()[source]
try_download_result()[source]
try_recover_from_json()[source]
update_submission_state()[source]

Check whether all the jobs in the submission.

Notes

this method will not handle unexpected (like resubmit terminated) job state in the submission.

upload_jobs()[source]
class dpdispatcher.submission.Task(command, task_work_path, forward_files=[], backward_files=[], outlog='log', errlog='err')[source]

Bases: object

A task is a sequential command to be executed, as well as the files it depends on to transmit forward and backward.

Parameters:
commandStr

the command to be executed.

task_work_pathPath

the directory of each file where the files are dependent on.

forward_fileslist of Path

the files to be transmitted to remote machine before the command execute.

backward_fileslist of Path

the files to be transmitted from remote machine after the comand finished.

outlogStr

the filename to which command redirect stdout

errlogStr

the filename to which command redirect stderr

Methods

deserialize(task_dict)

Convert the task_dict to a Task class object.

get_task_state(context)

Get the task state by checking the tag file.

arginfo

get_hash

load_from_dict

load_from_json

load_from_yaml

serialize

static arginfo()[source]
classmethod deserialize(task_dict)[source]

Convert the task_dict to a Task class object.

Parameters:
task_dictdict

the dictionary which contains the task information

Returns:
taskTask

the Task class instance converted from the task_dict

get_hash()[source]
get_task_state(context)[source]

Get the task state by checking the tag file.

Parameters:
contextContext

the context of the task

classmethod load_from_dict(task_dict: dict) Task[source]
classmethod load_from_json(json_file)[source]
classmethod load_from_yaml(yaml_file)[source]
serialize()[source]

Running the DeePMD-kit on the Expanse cluster

Expanse is a cluster operated by the San Diego Supercomputer Center. Here we provide an example to run jobs on the expanse.

The machine parameters are provided below. Expanse uses the SLURM workload manager for job scheduling. remote_root has been created in advance. It’s worth metioned that we do not recommend to use the password, so SSH keys are used instead to improve security.

 1{
 2  "batch_type": "Slurm",
 3  "local_root": "./",
 4  "remote_root": "/expanse/lustre/scratch/njzjz/temp_project/dpgen_workdir",
 5  "clean_asynchronously": true,
 6  "context_type": "SSHContext",
 7  "remote_profile": {
 8    "hostname": "login.expanse.sdsc.edu",
 9    "username": "njzjz",
10    "port": 22
11  }
12}

Expanse’s standard compute nodes are each powered by two 64-core AMD EPYC 7742 processors and contain 256 GB of DDR4 memory. Here, we request one node with 32 cores and 16 GB memory from the shared partition. Expanse does not support --gres=gpu:0 command, so we use custom_gpu_line to customize the statement.

 1{
 2  "number_node": 1,
 3  "cpu_per_node": 1,
 4  "gpu_per_node": 0,
 5  "queue_name": "shared",
 6  "group_size": 1,
 7  "custom_flags": [
 8    "#SBATCH -c 32",
 9    "#SBATCH --mem=16G",
10    "#SBATCH --time=48:00:00",
11    "#SBATCH --account=rut149",
12    "#SBATCH --requeue"
13  ],
14  "source_list": [
15    "activate /home/njzjz/deepmd-kit"
16  ],
17  "envs": {
18    "OMP_NUM_THREADS": 4,
19    "TF_INTRA_OP_PARALLELISM_THREADS": 4,
20    "TF_INTER_OP_PARALLELISM_THREADS": 8,
21    "DP_AUTO_PARALLELIZATION": 1
22  },
23  "batch_type": "Slurm",
24  "kwargs": {
25    "custom_gpu_line": "#SBATCH --gpus=0"
26  }
27}

The following task parameter runs a DeePMD-kit task, forwarding an input file and backwarding graph files. Here, the data set will be used among all the tasks, so it is not included in the forward_files. Instead, it should be included in the submission’s forward_common_files.

 1{
 2    "command": "dp train input.json && dp freeze && dp compress",
 3    "task_work_path": "model1/",
 4    "forward_files": [
 5      "input.json"
 6    ],
 7    "backward_files": [
 8      "frozen_model.pb",
 9      "frozen_model_compressed.pb"
10    ],
11    "outlog": "log",
12    "errlog": "err"
13}

Running Gaussian 16 with failure allowed

Typically, a task will retry three times if the exit code is not zero. Sometimes, one may allow non-zero code. For example, when running large amounts of Gaussian 16 single-point calculation tasks, some of the Gaussian 16 tasks may throw SCF errors and return a non-zero code. One can append ||: to the command:

 1{
 2    "command": "g16 < input > output ||:",
 3    "task_work_path": "p1/",
 4    "forward_files": [
 5      "input"
 6    ],
 7    "backward_files": [
 8      "output"
 9    ]
10}

This command ensures the task will always provide zero code.

Running multiple MD tasks on a GPU workstation

In this example, we are going to show how to run multiple MD tasks on a GPU workstation. This workstation does not install any job scheduling packages installed, so we will use Shell as batch_type.

 1{
 2  "batch_type": "Shell",
 3  "local_root": "./",
 4  "remote_root": "/data2/jinzhe/dpgen_workdir",
 5  "clean_asynchronously": true,
 6  "context_type": "SSHContext",
 7  "remote_profile": {
 8    "hostname": "mandu.iqb.rutgers.edu",
 9    "username": "jz748",
10    "port": 22
11  }
12}

The workstation has 48 cores of CPUs and 8 RTX3090 cards. Here we hope each card runs 6 tasks at the same time, as each task does not consume too many GPU resources. Thus, strategy/if_cuda_multi_devices is set to true and para_deg is set to 6.

 1{
 2  "number_node": 1,
 3  "cpu_per_node": 48,
 4  "gpu_per_node": 8,
 5  "queue_name": "shell",
 6  "group_size": 0,
 7  "strategy": {
 8    "if_cuda_multi_devices": true
 9  },
10  "source_list": [
11    "activate /home/jz748/deepmd-kit"
12  ],
13  "envs": {
14    "OMP_NUM_THREADS": 1,
15    "TF_INTRA_OP_PARALLELISM_THREADS": 1,
16    "TF_INTER_OP_PARALLELISM_THREADS": 1
17  },
18  "para_deg": 6
19}

Note that group_size should be set to 0 (means infinity) to ensure there is only one job and avoid running multiple jobs at the same time.

Customizing the submission script header

When submitting jobs to some clusters, such as the Tiger Cluster at Princeton University, the Slurm header is quite different from the standard one. In this case, DPDispatcher allows users to customize the templates by setting strategy/customized_script_header_template_file to a template file:

 1{
 2  "number_node": 1,
 3  "cpu_per_node": 32,
 4  "kwargs":{
 5    "qos": "tiger-vshort"
 6  },
 7  "source_list": ["activate abacus_env"],
 8  "strategy": {
 9    "customized_script_header_template_file": "./template.slurm"
10  },
11  "group_size": 2000
12}

template.slurm is the template file, where str.format() is used to format the template with Resources Parameters:

1#!/bin/bash -l
2#SBATCH --parsable
3#SBATCH --nodes={number_node}
4#SBATCH --ntasks-per-node={cpu_per_node}
5#SBATCH --qos={kwargs[qos]}
6#SBATCH --time=01:02:00
7#SBATCH --mem-per-cpu=4G

See Python Format String Syntax for how to insert parameters inside the template.

Authors

  • AnguseZhang

  • Byron

  • Cloudac7

  • Feifei Tian

  • Feiyang472

  • Franklalalala

  • Futaki Haduki

  • Futaki Hatsuki

  • Han Wang

  • Han Y.B

  • HuangJiameng

  • Jinzhe Zeng

  • KZHIWEI

  • Levi Zhou

  • PKUfjh

  • Pengchao Zhang

  • Tongqi Wen

  • TongqiWen

  • Xiaoshan Luo

  • Xuanyan Chen

  • Yifan Li李一帆

  • Yixiao Chen

  • Yongbin Zhuang

  • Yuan Fengbo

  • Yuan Fengbo (袁奉博)

  • Yunpei Liu

  • Zhang Yaotang

  • Zhengju Sha

  • Zhiwei Zhang

  • chenglab

  • ck

  • dependabot[bot]

  • dingzhaohan

  • dinngzhaohan

  • felix5572

  • haidi

  • likefallwind

  • luobangkui

  • pre-commit-ci[bot]

  • robinzyb

  • saltball

  • shazj99

  • tuoping

  • unknown

  • wangxiangfei

  • yuzhi

  • zhangbei07

  • zhaohan

  • zjgemi

Indices and tables