Source code for dpgen2.op.run_dp_train

import os, json, dpdata, glob, shutil
from pathlib import Path
from dpgen2.utils.run_command import run_command
from dpgen2.utils.chdir import set_directory
from dflow.python import (
    OP,
    OPIO,
    OPIOSign,
    Artifact,
    TransientError,
    FatalError,
)
from typing import (
    Tuple,
    List,
)
from dpgen2.constants import (
    train_task_pattern,
    train_script_name,
)
from dargs import (
    dargs,
    Argument,
    Variant,
    ArgumentEncoder,
)


[docs]class RunDPTrain(OP): r"""Execute a DP training task. Train and freeze a DP model. A working directory named `task_name` is created. All input files are copied or symbol linked to directory `task_name`. The DeePMD-kit training and freezing commands are exectuted from directory `task_name`. """
[docs] @classmethod def get_input_sign(cls): return OPIOSign( { "config": dict, "task_name": str, "task_path": Artifact(Path), "init_model": Artifact(Path, optional=True), "init_data": Artifact(List[Path]), "iter_data": Artifact(List[Path]), } )
[docs] @classmethod def get_output_sign(cls): return OPIOSign( { "script": Artifact(Path), "model": Artifact(Path), "lcurve": Artifact(Path), "log": Artifact(Path), } )
[docs] @OP.exec_sign_check def execute( self, ip: OPIO, ) -> OPIO: r"""Execute the OP. Parameters ---------- ip : dict Input dict with components: - `config`: (`dict`) The config of training task. Check `RunDPTrain.training_args` for definitions. - `task_name`: (`str`) The name of training task. - `task_path`: (`Artifact(Path)`) The path that contains all input files prepareed by `PrepDPTrain`. - `init_model`: (`Artifact(Path)`) A frozen model to initialize the training. - `init_data`: (`Artifact(List[Path])`) Initial training data. - `iter_data`: (`Artifact(List[Path])`) Training data generated in the DPGEN iterations. Returns ------- Output dict with components: - `script`: (`Artifact(Path)`) The training script. - `model`: (`Artifact(Path)`) The trained frozen model. - `lcurve`: (`Artifact(Path)`) The learning curve file. - `log`: (`Artifact(Path)`) The log file of training. Exceptions ---------- FatalError On the failure of training or freezing. Human intervention needed. """ config = ip["config"] if ip["config"] is not None else {} config = RunDPTrain.normalize_config(config) task_name = ip["task_name"] task_path = ip["task_path"] init_model = ip["init_model"] init_data = ip["init_data"] iter_data = ip["iter_data"] iter_data_old_exp = _expand_all_multi_sys_to_sys(iter_data[:-1]) iter_data_new_exp = _expand_all_multi_sys_to_sys(iter_data[-1:]) iter_data_exp = iter_data_old_exp + iter_data_new_exp work_dir = Path(task_name) # update the input script input_script = Path(task_path) / train_script_name with open(input_script) as fp: train_dict = json.load(fp) if "systems" in train_dict["training"]: major_version = "1" else: major_version = "2" # auto prob style do_init_model = RunDPTrain.decide_init_model( config, init_model, init_data, iter_data ) auto_prob_str = "prob_sys_size" if do_init_model: old_ratio = config["init_model_old_ratio"] numb_old = len(init_data) + len(iter_data_old_exp) numb_new = numb_old + len(iter_data_new_exp) auto_prob_str = f"prob_sys_size; 0:{numb_old}:{old_ratio}; {numb_old}:{numb_new}:{1.-old_ratio:g}" # update the input dict train_dict = RunDPTrain.write_data_to_input_script( train_dict, init_data, iter_data_exp, auto_prob_str, major_version ) train_dict = RunDPTrain.write_other_to_input_script( train_dict, config, do_init_model, major_version ) if RunDPTrain.skip_training(work_dir, train_dict, init_model, iter_data): return OPIO( { "script": work_dir / train_script_name, "model": work_dir / "frozen_model.pb", "lcurve": work_dir / "lcurve.out", "log": work_dir / "train.log", } ) with set_directory(work_dir): # open log fplog = open("train.log", "w") def clean_before_quit(): fplog.close() # dump train script with open(train_script_name, "w") as fp: json.dump(train_dict, fp, indent=4) # train model if do_init_model: command = [ "dp", "train", "--init-frz-model", str(init_model), train_script_name, ] else: command = ["dp", "train", train_script_name] ret, out, err = run_command(command) if ret != 0: clean_before_quit() raise FatalError( "dp train failed\n", "out msg", out, "\n", "err msg", err, "\n" ) fplog.write("#=================== train std out ===================\n") fplog.write(out) fplog.write("#=================== train std err ===================\n") fplog.write(err) # freeze model ret, out, err = run_command(["dp", "freeze", "-o", "frozen_model.pb"]) if ret != 0: clean_before_quit() raise FatalError( "dp freeze failed\n", "out msg", out, "\n", "err msg", err, "\n" ) fplog.write("#=================== freeze std out ===================\n") fplog.write(out) fplog.write("#=================== freeze std err ===================\n") fplog.write(err) clean_before_quit() return OPIO( { "script": work_dir / train_script_name, "model": work_dir / "frozen_model.pb", "lcurve": work_dir / "lcurve.out", "log": work_dir / "train.log", } )
[docs] @staticmethod def write_data_to_input_script( idict: dict, init_data: List[Path], iter_data: List[Path], auto_prob_str: str = "prob_sys_size", major_version: str = "1", ): odict = idict.copy() data_list = [str(ii) for ii in init_data] + [str(ii) for ii in iter_data] if major_version == "1": # v1 behavior odict["training"]["systems"] = data_list odict["training"].setdefault("batch_size", "auto") odict["training"]["auto_prob_style"] = auto_prob_str elif major_version == "2": # v2 behavior odict["training"]["training_data"]["systems"] = data_list odict["training"]["training_data"].setdefault("batch_size", "auto") odict["training"]["training_data"]["auto_prob"] = auto_prob_str odict["training"].pop("validation_data", None) else: raise RuntimeError("unsupported DeePMD-kit major version", major_version) return odict
[docs] @staticmethod def write_other_to_input_script( idict, config, do_init_model, major_version: str = "1", ): odict = idict.copy() odict["training"]["disp_file"] = "lcurve.out" if do_init_model: odict["learning_rate"]["start_lr"] = config["init_model_start_lr"] odict["loss"]["start_pref_e"] = config["init_model_start_pref_e"] odict["loss"]["start_pref_f"] = config["init_model_start_pref_f"] odict["loss"]["start_pref_v"] = config["init_model_start_pref_v"] if major_version == "1": odict["training"]["stop_batch"] = config["init_model_numb_steps"] elif major_version == "2": odict["training"]["numb_steps"] = config["init_model_numb_steps"] else: raise RuntimeError( "unsupported DeePMD-kit major version", major_version ) return odict
[docs] @staticmethod def skip_training( work_dir, train_dict, init_model, iter_data, ): # we have init model and no iter data, skip training if (init_model is not None) and (iter_data is None or len(iter_data) == 0): with set_directory(work_dir): with open(train_script_name, "w") as fp: json.dump(train_dict, fp, indent=4) Path("train.log").write_text( f"We have init model {init_model} and " f"no iteration training data. " f"The training is skipped.\n" ) Path("lcurve.out").touch() shutil.copy(init_model, "frozen_model.pb") return True else: return False
[docs] @staticmethod def decide_init_model( config, init_model, init_data, iter_data, ): do_init_model = False # decide if we do init-model ## cases we do definitely not if init_model is None or iter_data is None or len(iter_data) == 0: do_init_model = False ## cases controlled by the policy else: if config["init_model_policy"] == "no": do_init_model = False elif config["init_model_policy"] == "yes": do_init_model = True elif "old_data_larger_than" in config["init_model_policy"]: old_data_size_level = int(config["init_model_policy"].split(":")[-1]) init_data_size = _get_data_size_of_all_systems(init_data) iter_data_old_size = _get_data_size_of_all_mult_sys(iter_data[:-1]) old_data_size = init_data_size + iter_data_old_size if old_data_size > old_data_size_level: do_init_model = True return do_init_model
[docs] @staticmethod def training_args(): doc_init_model_prolicy = "The policy of init-model training. It can be\n\n\ - 'no': No init-model training. Traing from scratch.\n\n\ - 'yes': Do init-model training.\n\n\ - 'old_data_larger_than:XXX': Do init-model if the training data size of the previous model is larger than XXX. XXX is an int number." doc_init_model_old_ratio = "The frequency ratio of old data over new data" doc_init_model_numb_steps = "The number of training steps when init-model" doc_init_model_start_lr = "The start learning rate when init-model" doc_init_model_start_pref_e = ( "The start energy prefactor in loss when init-model" ) doc_init_model_start_pref_f = ( "The start force prefactor in loss when init-model" ) doc_init_model_start_pref_v = ( "The start virial prefactor in loss when init-model" ) return [ Argument( "init_model_policy", str, optional=True, default="no", doc=doc_init_model_prolicy, ), Argument( "init_model_old_ratio", float, optional=True, default=0.9, doc=doc_init_model_old_ratio, ), Argument( "init_model_numb_steps", int, optional=True, default=400000, doc=doc_init_model_numb_steps, alias=["init_model_stop_batch"], ), Argument( "init_model_start_lr", float, optional=True, default=1e-4, doc=doc_init_model_start_lr, ), Argument( "init_model_start_pref_e", float, optional=True, default=0.1, doc=doc_init_model_start_pref_e, ), Argument( "init_model_start_pref_f", float, optional=True, default=100, doc=doc_init_model_start_pref_f, ), Argument( "init_model_start_pref_v", float, optional=True, default=0.0, doc=doc_init_model_start_pref_v, ), ]
[docs] @staticmethod def normalize_config(data={}): ta = RunDPTrain.training_args() base = Argument("base", dict, ta) data = base.normalize_value(data, trim_pattern="_*") base.check_value(data, strict=True) return data
def _get_data_size_of_system(data_dir): ss = dpdata.System(data_dir, fmt="deepmd/npy") return ss.get_nframes() def _get_data_size_of_all_systems(data_dirs): count = 0 for ii in data_dirs: count += _get_data_size_of_system(ii) return count def _get_data_size_of_mult_sys(data_dir): ms = dpdata.MultiSystems() ms.from_deepmd_npy(data_dir) return ms.get_nframes() def _get_data_size_of_all_mult_sys(data_dirs): count = 0 for ii in data_dirs: count += _get_data_size_of_mult_sys(ii) return count def _expand_multi_sys_to_sys(multi_sys_dir): all_type_raws = sorted(glob.glob(os.path.join(multi_sys_dir, "*", "type.raw"))) all_sys_dirs = [str(Path(ii).parent) for ii in all_type_raws] return all_sys_dirs def _expand_all_multi_sys_to_sys(list_multi_sys): all_sys_dirs = [] for ii in list_multi_sys: all_sys_dirs = all_sys_dirs + _expand_multi_sys_to_sys(ii) return all_sys_dirs config_args = RunDPTrain.training_args