# SPDX-License-Identifier: LGPL-3.0-or-later
import functools
import logging
from typing import (
Optional,
)
import torch
from deepmd.dpmodel import (
FittingOutputDef,
)
from deepmd.pt.model.descriptor.base_descriptor import (
BaseDescriptor,
)
from deepmd.pt.model.task.base_fitting import (
BaseFitting,
)
from deepmd.utils.path import (
DPPath,
)
from deepmd.utils.version import (
check_version_compatibility,
)
from .base_atomic_model import (
BaseAtomicModel,
)
[docs]
log = logging.getLogger(__name__)
@BaseAtomicModel.register("standard")
[docs]
class DPAtomicModel(BaseAtomicModel):
"""Model give atomic prediction of some physical property.
Parameters
----------
descriptor
Descriptor
fitting_net
Fitting net
type_map
Mapping atom type to the name (str) of the type.
For example `type_map[1]` gives the name of the type 1.
"""
def __init__(
self,
descriptor,
fitting,
type_map: list[str],
**kwargs,
) -> None:
super().__init__(type_map, **kwargs)
ntypes = len(type_map)
[docs]
self.type_map = type_map
[docs]
self.descriptor = descriptor
[docs]
self.rcut = self.descriptor.get_rcut()
[docs]
self.sel = self.descriptor.get_sel()
[docs]
self.fitting_net = fitting
super().init_out_stat()
[docs]
self.enable_eval_descriptor_hook = False
[docs]
self.eval_descriptor_list = []
eval_descriptor_list: list[torch.Tensor]
[docs]
def set_eval_descriptor_hook(self, enable: bool) -> None:
"""Set the hook for evaluating descriptor and clear the cache for descriptor list."""
self.enable_eval_descriptor_hook = enable
self.eval_descriptor_list = []
[docs]
def eval_descriptor(self) -> torch.Tensor:
"""Evaluate the descriptor."""
return torch.concat(self.eval_descriptor_list)
@torch.jit.export
[docs]
def fitting_output_def(self) -> FittingOutputDef:
"""Get the output def of the fitting net."""
return (
self.fitting_net.output_def()
if self.fitting_net is not None
else self.coord_denoise_net.output_def()
)
@torch.jit.export
[docs]
def get_rcut(self) -> float:
"""Get the cut-off radius."""
return self.rcut
[docs]
def get_sel(self) -> list[int]:
"""Get the neighbor selection."""
return self.sel
[docs]
def mixed_types(self) -> bool:
"""If true, the model
1. assumes total number of atoms aligned across frames;
2. uses a neighbor list that does not distinguish different atomic types.
If false, the model
1. assumes total number of atoms of each atom type aligned across frames;
2. uses a neighbor list that distinguishes different atomic types.
"""
return self.descriptor.mixed_types()
[docs]
def change_type_map(
self, type_map: list[str], model_with_new_type_stat=None
) -> None:
"""Change the type related params to new ones, according to `type_map` and the original one in the model.
If there are new types in `type_map`, statistics will be updated accordingly to `model_with_new_type_stat` for these new types.
"""
super().change_type_map(
type_map=type_map, model_with_new_type_stat=model_with_new_type_stat
)
self.type_map = type_map
self.ntypes = len(type_map)
self.descriptor.change_type_map(
type_map=type_map,
model_with_new_type_stat=model_with_new_type_stat.descriptor
if model_with_new_type_stat is not None
else None,
)
self.fitting_net.change_type_map(type_map=type_map)
[docs]
def has_message_passing(self) -> bool:
"""Returns whether the atomic model has message passing."""
return self.descriptor.has_message_passing()
[docs]
def need_sorted_nlist_for_lower(self) -> bool:
"""Returns whether the atomic model needs sorted nlist when using `forward_lower`."""
return self.descriptor.need_sorted_nlist_for_lower()
[docs]
def serialize(self) -> dict:
dd = BaseAtomicModel.serialize(self)
dd.update(
{
"@class": "Model",
"@version": 2,
"type": "standard",
"type_map": self.type_map,
"descriptor": self.descriptor.serialize(),
"fitting": self.fitting_net.serialize(),
}
)
return dd
@classmethod
[docs]
def deserialize(cls, data) -> "DPAtomicModel":
data = data.copy()
check_version_compatibility(data.pop("@version", 1), 2, 1)
data.pop("@class", None)
data.pop("type", None)
descriptor_obj = BaseDescriptor.deserialize(data.pop("descriptor"))
fitting_obj = BaseFitting.deserialize(data.pop("fitting"))
data["descriptor"] = descriptor_obj
data["fitting"] = fitting_obj
obj = super().deserialize(data)
return obj
[docs]
def enable_compression(
self,
min_nbor_dist: float,
table_extrapolate: float = 5,
table_stride_1: float = 0.01,
table_stride_2: float = 0.1,
check_frequency: int = -1,
) -> None:
"""Call descriptor enable_compression().
Parameters
----------
min_nbor_dist
The nearest distance between atoms
table_extrapolate
The scale of model extrapolation
table_stride_1
The uniform stride of the first table
table_stride_2
The uniform stride of the second table
check_frequency
The overflow check frequency
"""
self.descriptor.enable_compression(
min_nbor_dist,
table_extrapolate,
table_stride_1,
table_stride_2,
check_frequency,
)
[docs]
def forward_atomic(
self,
extended_coord,
extended_atype,
nlist,
mapping: Optional[torch.Tensor] = None,
fparam: Optional[torch.Tensor] = None,
aparam: Optional[torch.Tensor] = None,
comm_dict: Optional[dict[str, torch.Tensor]] = None,
) -> dict[str, torch.Tensor]:
"""Return atomic prediction.
Parameters
----------
extended_coord
coordinates in extended region
extended_atype
atomic type in extended region
nlist
neighbor list. nf x nloc x nsel
mapping
mapps the extended indices to local indices
fparam
frame parameter. nf x ndf
aparam
atomic parameter. nf x nloc x nda
Returns
-------
result_dict
the result dict, defined by the `FittingOutputDef`.
"""
nframes, nloc, nnei = nlist.shape
atype = extended_atype[:, :nloc]
if self.do_grad_r() or self.do_grad_c():
extended_coord.requires_grad_(True)
descriptor, rot_mat, g2, h2, sw = self.descriptor(
extended_coord,
extended_atype,
nlist,
mapping=mapping,
comm_dict=comm_dict,
)
assert descriptor is not None
if self.enable_eval_descriptor_hook:
self.eval_descriptor_list.append(descriptor)
# energy, force
fit_ret = self.fitting_net(
descriptor,
atype,
gr=rot_mat,
g2=g2,
h2=h2,
fparam=fparam,
aparam=aparam,
)
return fit_ret
[docs]
def get_out_bias(self) -> torch.Tensor:
return self.out_bias
[docs]
def compute_or_load_stat(
self,
sampled_func,
stat_file_path: Optional[DPPath] = None,
) -> None:
"""
Compute or load the statistics parameters of the model,
such as mean and standard deviation of descriptors or the energy bias of the fitting net.
When `sampled` is provided, all the statistics parameters will be calculated (or re-calculated for update),
and saved in the `stat_file_path`(s).
When `sampled` is not provided, it will check the existence of `stat_file_path`(s)
and load the calculated statistics parameters.
Parameters
----------
sampled_func
The lazy sampled function to get data frames from different data systems.
stat_file_path
The dictionary of paths to the statistics files.
"""
if stat_file_path is not None and self.type_map is not None:
# descriptors and fitting net with different type_map
# should not share the same parameters
stat_file_path /= " ".join(self.type_map)
@functools.lru_cache
def wrapped_sampler():
sampled = sampled_func()
if self.pair_excl is not None:
pair_exclude_types = self.pair_excl.get_exclude_types()
for sample in sampled:
sample["pair_exclude_types"] = list(pair_exclude_types)
if self.atom_excl is not None:
atom_exclude_types = self.atom_excl.get_exclude_types()
for sample in sampled:
sample["atom_exclude_types"] = list(atom_exclude_types)
return sampled
self.descriptor.compute_input_stats(wrapped_sampler, stat_file_path)
self.compute_or_load_out_stat(wrapped_sampler, stat_file_path)
[docs]
def get_dim_fparam(self) -> int:
"""Get the number (dimension) of frame parameters of this atomic model."""
return self.fitting_net.get_dim_fparam()
[docs]
def get_dim_aparam(self) -> int:
"""Get the number (dimension) of atomic parameters of this atomic model."""
return self.fitting_net.get_dim_aparam()
[docs]
def get_sel_type(self) -> list[int]:
"""Get the selected atom types of this model.
Only atoms with selected atom types have atomic contribution
to the result of the model.
If returning an empty list, all atom types are selected.
"""
return self.fitting_net.get_sel_type()
[docs]
def is_aparam_nall(self) -> bool:
"""Check whether the shape of atomic parameters is (nframes, nall, ndim).
If False, the shape is (nframes, nloc, ndim).
"""
return False