Source code for deepmd.train.run_options

"""Module taking care of important package constants."""

import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

import numpy as np
from deepmd.cluster import get_resource
from deepmd.env import get_tf_default_nthreads, tf, GLOBAL_CONFIG, global_float_prec
from deepmd.loggers import set_log_handles

    import horovod.tensorflow as HVD

__all__ = [

log = logging.getLogger(__name__)

# Font:Big"
WELCOME = (  # noqa
    " _____               _____   __  __  _____           _     _  _   ",
    "|  __ \             |  __ \ |  \/  ||  __ \         | |   (_)| |  ",
    "| |  | |  ___   ___ | |__) || \  / || |  | | ______ | | __ _ | |_ ",
    "| |  | | / _ \ / _ \|  ___/ | |\/| || |  | ||______|| |/ /| || __|",
    "| |__| ||  __/|  __/| |     | |  | || |__| |        |   < | || |_ ",
    "|_____/  \___| \___||_|     |_|  |_||_____/         |_|\_\|_| \__|",

    "Please read and cite:",
    "Wang, Zhang, Han and E, Comput.Phys.Comm. 228, 178-184 (2018)",

_sep = "\n                      "
    f"installed to:         {GLOBAL_CONFIG['install_prefix']}",
    f"source :              {GLOBAL_CONFIG['git_summ']}",
    f"source brach:         {GLOBAL_CONFIG['git_branch']}",
    f"source commit:        {GLOBAL_CONFIG['git_hash']}",
    f"source commit at:     {GLOBAL_CONFIG['git_date']}",
    f"build float prec:     {global_float_prec}",
    f"build variant:        {GLOBAL_CONFIG['dp_variant']}",
    f"build with tf inc:    {GLOBAL_CONFIG['tf_include_dir']}",
    f"build with tf lib:    {GLOBAL_CONFIG['tf_libs'].replace(';', _sep)}"  # noqa

[docs]class RunOptions: """Class with inf oon how to run training (cluster, MPI and GPU config). Attributes ---------- gpus: Optional[List[int]] list of GPUs if any are present else None is_chief: bool in distribured training it is true for tha main MPI process in serail it is always true world_size: int total worker count my_rank: int index of the MPI task nodename: str name of the node node_list_ : List[str] the list of nodes of the current mpirun my_device: str deviice type - gpu or cpu """ gpus: Optional[List[int]] world_size: int my_rank: int nodename: str nodelist: List[int] my_device: str _HVD: Optional["HVD"] _log_handles_already_set: bool = False def __init__( self, init_model: Optional[str] = None, init_frz_model: Optional[str] = None, restart: Optional[str] = None, log_path: Optional[str] = None, log_level: int = 0, mpi_log: str = "master" ): self._try_init_distrib() if all((init_model, restart)): raise RuntimeError( "--init-model and --restart should not be set at the same time" ) # model init options self.restart = restart self.init_model = init_model self.init_mode = "init_from_scratch" if restart is not None: self.restart = os.path.abspath(restart) self.init_mode = "restart" elif init_model is not None: self.init_model = os.path.abspath(init_model) self.init_mode = "init_from_model" elif init_frz_model is not None: self.init_frz_model = os.path.abspath(init_frz_model) self.init_mode = "init_from_frz_model" self._setup_logger(Path(log_path) if log_path else None, log_level, mpi_log) @property def is_chief(self): """Whether my rank is 0.""" return self.my_rank == 0
[docs] def print_resource_summary(self): """Print build and current running cluster configuration summary.""""---Summary of the training---------------------------------------") if self.is_distrib:"distributed")"world size: {self.world_size}")"my rank: {self.my_rank}")"node list: {self.nodelist}")"running on: {self.nodename}")"computing device: {self.my_device}") env_value = os.environ.get('CUDA_VISIBLE_DEVICES', 'unset')"CUDA_VISIBLE_DEVICES: {env_value}")"Count of visible GPU: {len(self.gpus or [])}") intra, inter = get_tf_default_nthreads()"num_intra_threads: {intra:d}")"num_inter_threads: {inter:d}")"-----------------------------------------------------------------")
def _setup_logger( self, log_path: Optional[Path], log_level: int, mpi_log: Optional[str], ): """Set up package loggers. Parameters ---------- log_level: int logging level log_path: Optional[str] path to log file, if None logs will be send only to console. If the parent directory does not exist it will be automatically created, by default None mpi_log : Optional[str], optional mpi log type. Has three options. `master` will output logs to file and console only from rank==0. `collect` will write messages from all ranks to one file opened under rank==0 and to console. `workers` will open one log file for each worker designated by its rank, console behaviour is the same as for `collect`. """ if not self._log_handles_already_set: if not self._HVD: mpi_log = None set_log_handles(log_level, log_path, mpi_log=mpi_log) self._log_handles_already_set = True log.debug("Log handles were successfully set") else: log.warning( f"Log handles have already been set. It is not advisable to " f"reset them{', especially when runnig with MPI!' if self._HVD else ''}" ) def _try_init_distrib(self): try: import horovod.tensorflow as HVD HVD.init() self.is_distrib = HVD.size() > 1 except ImportError: log.warning("Switch to serial execution due to lack of horovod module.") self.is_distrib = False # Do real intialization if self.is_distrib: self._init_distributed(HVD) self._HVD = HVD else: self._init_serial() self._HVD = None def _init_distributed(self, HVD: "HVD"): """Initialize settings for distributed training. Parameters ---------- HVD : HVD horovod object """ nodename, nodelist, gpus = get_resource() self.nodename = nodename self.nodelist = nodelist self.gpus = gpus self.my_rank = HVD.rank() self.world_size = HVD.size() if gpus is not None: gpu_idx = HVD.local_rank() if gpu_idx >= len(gpus): raise RuntimeError('Count of local processes is larger than that of available GPUs!') self.my_device = f"gpu:{gpu_idx:d}" else: self.my_device = "cpu:0" def _init_serial(self): """Initialize setting for serial training.""" nodename, _, gpus = get_resource() self.gpus = gpus self.world_size = 1 self.my_rank = 0 self.nodename = nodename self.nodelist = [nodename] if gpus is not None: self.my_device = "gpu:0" else: self.my_device = "cpu:0" self._HVD = None