import os, json, dpdata, glob, shutil
from pathlib import Path
from dpgen2.utils.run_command import run_command
from dpgen2.utils.chdir import set_directory
from dflow.python import (
OP,
OPIO,
OPIOSign,
Artifact,
TransientError,
FatalError,
)
from typing import (
Tuple,
List,
)
from dpgen2.constants import (
train_task_pattern,
train_script_name,
)
from dargs import (
dargs,
Argument,
Variant,
ArgumentEncoder,
)
[docs]class RunDPTrain(OP):
r"""Execute a DP training task. Train and freeze a DP model.
A working directory named `task_name` is created. All input files
are copied or symbol linked to directory `task_name`. The
DeePMD-kit training and freezing commands are exectuted from
directory `task_name`.
"""
[docs] @classmethod
def get_output_sign(cls):
return OPIOSign(
{
"script": Artifact(Path),
"model": Artifact(Path),
"lcurve": Artifact(Path),
"log": Artifact(Path),
}
)
[docs] @OP.exec_sign_check
def execute(
self,
ip: OPIO,
) -> OPIO:
r"""Execute the OP.
Parameters
----------
ip : dict
Input dict with components:
- `config`: (`dict`) The config of training task. Check `RunDPTrain.training_args` for definitions.
- `task_name`: (`str`) The name of training task.
- `task_path`: (`Artifact(Path)`) The path that contains all input files prepareed by `PrepDPTrain`.
- `init_model`: (`Artifact(Path)`) A frozen model to initialize the training.
- `init_data`: (`Artifact(List[Path])`) Initial training data.
- `iter_data`: (`Artifact(List[Path])`) Training data generated in the DPGEN iterations.
Returns
-------
Output dict with components:
- `script`: (`Artifact(Path)`) The training script.
- `model`: (`Artifact(Path)`) The trained frozen model.
- `lcurve`: (`Artifact(Path)`) The learning curve file.
- `log`: (`Artifact(Path)`) The log file of training.
Exceptions
----------
FatalError
On the failure of training or freezing. Human intervention needed.
"""
config = ip["config"] if ip["config"] is not None else {}
config = RunDPTrain.normalize_config(config)
task_name = ip["task_name"]
task_path = ip["task_path"]
init_model = ip["init_model"]
init_data = ip["init_data"]
iter_data = ip["iter_data"]
iter_data_old_exp = _expand_all_multi_sys_to_sys(iter_data[:-1])
iter_data_new_exp = _expand_all_multi_sys_to_sys(iter_data[-1:])
iter_data_exp = iter_data_old_exp + iter_data_new_exp
work_dir = Path(task_name)
# update the input script
input_script = Path(task_path) / train_script_name
with open(input_script) as fp:
train_dict = json.load(fp)
if "systems" in train_dict["training"]:
major_version = "1"
else:
major_version = "2"
# auto prob style
do_init_model = RunDPTrain.decide_init_model(
config, init_model, init_data, iter_data
)
auto_prob_str = "prob_sys_size"
if do_init_model:
old_ratio = config["init_model_old_ratio"]
numb_old = len(init_data) + len(iter_data_old_exp)
numb_new = numb_old + len(iter_data_new_exp)
auto_prob_str = f"prob_sys_size; 0:{numb_old}:{old_ratio}; {numb_old}:{numb_new}:{1.-old_ratio:g}"
# update the input dict
train_dict = RunDPTrain.write_data_to_input_script(
train_dict, init_data, iter_data_exp, auto_prob_str, major_version
)
train_dict = RunDPTrain.write_other_to_input_script(
train_dict, config, do_init_model, major_version
)
if RunDPTrain.skip_training(work_dir, train_dict, init_model, iter_data):
return OPIO(
{
"script": work_dir / train_script_name,
"model": work_dir / "frozen_model.pb",
"lcurve": work_dir / "lcurve.out",
"log": work_dir / "train.log",
}
)
with set_directory(work_dir):
# open log
fplog = open("train.log", "w")
def clean_before_quit():
fplog.close()
# dump train script
with open(train_script_name, "w") as fp:
json.dump(train_dict, fp, indent=4)
# train model
if do_init_model:
command = [
"dp",
"train",
"--init-frz-model",
str(init_model),
train_script_name,
]
else:
command = ["dp", "train", train_script_name]
ret, out, err = run_command(command)
if ret != 0:
clean_before_quit()
raise FatalError(
"dp train failed\n", "out msg", out, "\n", "err msg", err, "\n"
)
fplog.write("#=================== train std out ===================\n")
fplog.write(out)
fplog.write("#=================== train std err ===================\n")
fplog.write(err)
# freeze model
ret, out, err = run_command(["dp", "freeze", "-o", "frozen_model.pb"])
if ret != 0:
clean_before_quit()
raise FatalError(
"dp freeze failed\n", "out msg", out, "\n", "err msg", err, "\n"
)
fplog.write("#=================== freeze std out ===================\n")
fplog.write(out)
fplog.write("#=================== freeze std err ===================\n")
fplog.write(err)
clean_before_quit()
return OPIO(
{
"script": work_dir / train_script_name,
"model": work_dir / "frozen_model.pb",
"lcurve": work_dir / "lcurve.out",
"log": work_dir / "train.log",
}
)
[docs] @staticmethod
def skip_training(
work_dir,
train_dict,
init_model,
iter_data,
):
# we have init model and no iter data, skip training
if (init_model is not None) and (iter_data is None or len(iter_data) == 0):
with set_directory(work_dir):
with open(train_script_name, "w") as fp:
json.dump(train_dict, fp, indent=4)
Path("train.log").write_text(
f"We have init model {init_model} and "
f"no iteration training data. "
f"The training is skipped.\n"
)
Path("lcurve.out").touch()
shutil.copy(init_model, "frozen_model.pb")
return True
else:
return False
[docs] @staticmethod
def decide_init_model(
config,
init_model,
init_data,
iter_data,
):
do_init_model = False
# decide if we do init-model
## cases we do definitely not
if init_model is None or iter_data is None or len(iter_data) == 0:
do_init_model = False
## cases controlled by the policy
else:
if config["init_model_policy"] == "no":
do_init_model = False
elif config["init_model_policy"] == "yes":
do_init_model = True
elif "old_data_larger_than" in config["init_model_policy"]:
old_data_size_level = int(config["init_model_policy"].split(":")[-1])
init_data_size = _get_data_size_of_all_systems(init_data)
iter_data_old_size = _get_data_size_of_all_mult_sys(iter_data[:-1])
old_data_size = init_data_size + iter_data_old_size
if old_data_size > old_data_size_level:
do_init_model = True
return do_init_model
[docs] @staticmethod
def training_args():
doc_init_model_prolicy = "The policy of init-model training. It can be\n\n\
- 'no': No init-model training. Traing from scratch.\n\n\
- 'yes': Do init-model training.\n\n\
- 'old_data_larger_than:XXX': Do init-model if the training data size of the previous model is larger than XXX. XXX is an int number."
doc_init_model_old_ratio = "The frequency ratio of old data over new data"
doc_init_model_numb_steps = "The number of training steps when init-model"
doc_init_model_start_lr = "The start learning rate when init-model"
doc_init_model_start_pref_e = (
"The start energy prefactor in loss when init-model"
)
doc_init_model_start_pref_f = (
"The start force prefactor in loss when init-model"
)
doc_init_model_start_pref_v = (
"The start virial prefactor in loss when init-model"
)
return [
Argument(
"init_model_policy",
str,
optional=True,
default="no",
doc=doc_init_model_prolicy,
),
Argument(
"init_model_old_ratio",
float,
optional=True,
default=0.9,
doc=doc_init_model_old_ratio,
),
Argument(
"init_model_numb_steps",
int,
optional=True,
default=400000,
doc=doc_init_model_numb_steps,
alias=["init_model_stop_batch"],
),
Argument(
"init_model_start_lr",
float,
optional=True,
default=1e-4,
doc=doc_init_model_start_lr,
),
Argument(
"init_model_start_pref_e",
float,
optional=True,
default=0.1,
doc=doc_init_model_start_pref_e,
),
Argument(
"init_model_start_pref_f",
float,
optional=True,
default=100,
doc=doc_init_model_start_pref_f,
),
Argument(
"init_model_start_pref_v",
float,
optional=True,
default=0.0,
doc=doc_init_model_start_pref_v,
),
]
[docs] @staticmethod
def normalize_config(data={}):
ta = RunDPTrain.training_args()
base = Argument("base", dict, ta)
data = base.normalize_value(data, trim_pattern="_*")
base.check_value(data, strict=True)
return data
def _get_data_size_of_system(data_dir):
ss = dpdata.System(data_dir, fmt="deepmd/npy")
return ss.get_nframes()
def _get_data_size_of_all_systems(data_dirs):
count = 0
for ii in data_dirs:
count += _get_data_size_of_system(ii)
return count
def _get_data_size_of_mult_sys(data_dir):
ms = dpdata.MultiSystems()
ms.from_deepmd_npy(data_dir)
return ms.get_nframes()
def _get_data_size_of_all_mult_sys(data_dirs):
count = 0
for ii in data_dirs:
count += _get_data_size_of_mult_sys(ii)
return count
def _expand_multi_sys_to_sys(multi_sys_dir):
all_type_raws = sorted(glob.glob(os.path.join(multi_sys_dir, "*", "type.raw")))
all_sys_dirs = [str(Path(ii).parent) for ii in all_type_raws]
return all_sys_dirs
def _expand_all_multi_sys_to_sys(list_multi_sys):
all_sys_dirs = []
for ii in list_multi_sys:
all_sys_dirs = all_sys_dirs + _expand_multi_sys_to_sys(ii)
return all_sys_dirs
config_args = RunDPTrain.training_args