Source code for deepmd.infer.deep_pot

# SPDX-License-Identifier: LGPL-3.0-or-later
import logging
from typing import (
    TYPE_CHECKING,
    Callable,
    List,
    Optional,
    Tuple,
    Union,
)

import numpy as np

from deepmd.common import (
    make_default_mesh,
)
from deepmd.infer.data_modifier import (
    DipoleChargeModifier,
)
from deepmd.infer.deep_eval import (
    DeepEval,
)
from deepmd.utils.batch_size import (
    AutoBatchSize,
)
from deepmd.utils.sess import (
    run_sess,
)

if TYPE_CHECKING:
    from pathlib import (
        Path,
    )

log = logging.getLogger(__name__)


[docs]class DeepPot(DeepEval): """Constructor. Parameters ---------- model_file : Path The name of the frozen model file. load_prefix: str The prefix in the load computational graph default_tf_graph : bool If uses the default tf graph, otherwise build a new tf graph for evaluation auto_batch_size : bool or int or AutomaticBatchSize, default: True If True, automatic batch size will be used. If int, it will be used as the initial batch size. input_map : dict, optional The input map for tf.import_graph_def. Only work with default tf graph neighbor_list : ase.neighborlist.NewPrimitiveNeighborList, optional The ASE neighbor list class to produce the neighbor list. If None, the neighbor list will be built natively in the model. Examples -------- >>> from deepmd.infer import DeepPot >>> import numpy as np >>> dp = DeepPot('graph.pb') >>> coord = np.array([[1,0,0], [0,0,1.5], [1,0,3]]).reshape([1, -1]) >>> cell = np.diag(10 * np.ones(3)).reshape([1, -1]) >>> atype = [1,0,1] >>> e, f, v = dp.eval(coord, cell, atype) where `e`, `f` and `v` are predicted energy, force and virial of the system, respectively. Warnings -------- For developers: `DeepTensor` initializer must be called at the end after `self.tensors` are modified because it uses the data in `self.tensors` dict. Do not chanage the order! """ def __init__( self, model_file: "Path", load_prefix: str = "load", default_tf_graph: bool = False, auto_batch_size: Union[bool, int, AutoBatchSize] = True, input_map: Optional[dict] = None, neighbor_list=None, ) -> None: # add these tensors on top of what is defined by DeepTensor Class # use this in favor of dict update to move attribute from class to # instance namespace self.tensors = { # descrpt attrs "t_ntypes": "descrpt_attr/ntypes:0", "t_rcut": "descrpt_attr/rcut:0", # fitting attrs "t_dfparam": "fitting_attr/dfparam:0", "t_daparam": "fitting_attr/daparam:0", # model attrs "t_tmap": "model_attr/tmap:0", # inputs "t_coord": "t_coord:0", "t_type": "t_type:0", "t_natoms": "t_natoms:0", "t_box": "t_box:0", "t_mesh": "t_mesh:0", # add output tensors "t_energy": "o_energy:0", "t_force": "o_force:0", "t_virial": "o_virial:0", "t_ae": "o_atom_energy:0", "t_av": "o_atom_virial:0", "t_descriptor": "o_descriptor:0", } DeepEval.__init__( self, model_file, load_prefix=load_prefix, default_tf_graph=default_tf_graph, auto_batch_size=auto_batch_size, input_map=input_map, neighbor_list=neighbor_list, ) # load optional tensors operations = [op.name for op in self.graph.get_operations()] # check if the graph has these operations: # if yes add them if ("%s/t_efield" % load_prefix) in operations: self.tensors.update({"t_efield": "t_efield:0"}) self.has_efield = True else: log.debug("Could not get tensor 't_efield:0'") self.t_efield = None self.has_efield = False if ("%s/t_fparam" % load_prefix) in operations: self.tensors.update({"t_fparam": "t_fparam:0"}) self.has_fparam = True else: log.debug("Could not get tensor 't_fparam:0'") self.t_fparam = None self.has_fparam = False if ("%s/t_aparam" % load_prefix) in operations: self.tensors.update({"t_aparam": "t_aparam:0"}) self.has_aparam = True else: log.debug("Could not get tensor 't_aparam:0'") self.t_aparam = None self.has_aparam = False if ("%s/spin_attr/ntypes_spin" % load_prefix) in operations: self.tensors.update({"t_ntypes_spin": "spin_attr/ntypes_spin:0"}) self.has_spin = True else: self.ntypes_spin = 0 self.has_spin = False # now load tensors to object attributes for attr_name, tensor_name in self.tensors.items(): try: self._get_tensor(tensor_name, attr_name) except KeyError: if attr_name != "t_descriptor": raise self._run_default_sess() self.tmap = self.tmap.decode("UTF-8").split() # setup modifier try: t_modifier_type = self._get_tensor("modifier_attr/type:0") self.modifier_type = run_sess(self.sess, t_modifier_type).decode("UTF-8") except (ValueError, KeyError): self.modifier_type = None try: t_jdata = self._get_tensor("train_attr/training_script:0") jdata = run_sess(self.sess, t_jdata).decode("UTF-8") import json jdata = json.loads(jdata) self.descriptor_type = jdata["model"]["descriptor"]["type"] except (ValueError, KeyError): self.descriptor_type = None if self.modifier_type == "dipole_charge": t_mdl_name = self._get_tensor("modifier_attr/mdl_name:0") t_mdl_charge_map = self._get_tensor("modifier_attr/mdl_charge_map:0") t_sys_charge_map = self._get_tensor("modifier_attr/sys_charge_map:0") t_ewald_h = self._get_tensor("modifier_attr/ewald_h:0") t_ewald_beta = self._get_tensor("modifier_attr/ewald_beta:0") [mdl_name, mdl_charge_map, sys_charge_map, ewald_h, ewald_beta] = run_sess( self.sess, [ t_mdl_name, t_mdl_charge_map, t_sys_charge_map, t_ewald_h, t_ewald_beta, ], ) mdl_name = mdl_name.decode("UTF-8") mdl_charge_map = [int(ii) for ii in mdl_charge_map.decode("UTF-8").split()] sys_charge_map = [int(ii) for ii in sys_charge_map.decode("UTF-8").split()] self.dm = DipoleChargeModifier( mdl_name, mdl_charge_map, sys_charge_map, ewald_h=ewald_h, ewald_beta=ewald_beta, ) def _run_default_sess(self): if self.has_spin is True: [ self.ntypes, self.ntypes_spin, self.rcut, self.dfparam, self.daparam, self.tmap, ] = run_sess( self.sess, [ self.t_ntypes, self.t_ntypes_spin, self.t_rcut, self.t_dfparam, self.t_daparam, self.t_tmap, ], ) else: [self.ntypes, self.rcut, self.dfparam, self.daparam, self.tmap] = run_sess( self.sess, [ self.t_ntypes, self.t_rcut, self.t_dfparam, self.t_daparam, self.t_tmap, ], )
[docs] def get_ntypes(self) -> int: """Get the number of atom types of this model.""" return self.ntypes
[docs] def get_ntypes_spin(self): """Get the number of spin atom types of this model.""" return self.ntypes_spin
[docs] def get_rcut(self) -> float: """Get the cut-off radius of this model.""" return self.rcut
[docs] def get_type_map(self) -> List[str]: """Get the type map (element name of the atom types) of this model.""" return self.tmap
[docs] def get_sel_type(self) -> List[int]: """Unsupported in this model.""" raise NotImplementedError("This model type does not support this attribute")
[docs] def get_descriptor_type(self) -> List[int]: """Get the descriptor type of this model.""" return self.descriptor_type
[docs] def get_dim_fparam(self) -> int: """Get the number (dimension) of frame parameters of this DP.""" return self.dfparam
[docs] def get_dim_aparam(self) -> int: """Get the number (dimension) of atomic parameters of this DP.""" return self.daparam
def _eval_func(self, inner_func: Callable, numb_test: int, natoms: int) -> Callable: """Wrapper method with auto batch size. Parameters ---------- inner_func : Callable the method to be wrapped numb_test : int number of tests natoms : int number of atoms Returns ------- Callable the wrapper """ if self.auto_batch_size is not None: def eval_func(*args, **kwargs): return self.auto_batch_size.execute_all( inner_func, numb_test, natoms, *args, **kwargs ) else: eval_func = inner_func return eval_func def _get_natoms_and_nframes( self, coords: np.ndarray, atom_types: Union[List[int], np.ndarray], mixed_type: bool = False, ) -> Tuple[int, int]: if mixed_type: natoms = len(atom_types[0]) else: natoms = len(atom_types) if natoms == 0: assert coords.size == 0 else: coords = np.reshape(np.array(coords), [-1, natoms * 3]) nframes = coords.shape[0] return natoms, nframes
[docs] def eval( self, coords: np.ndarray, cells: np.ndarray, atom_types: List[int], atomic: bool = False, fparam: Optional[np.ndarray] = None, aparam: Optional[np.ndarray] = None, efield: Optional[np.ndarray] = None, mixed_type: bool = False, ) -> Tuple[np.ndarray, ...]: """Evaluate the energy, force and virial by using this DP. Parameters ---------- coords The coordinates of atoms. The array should be of size nframes x natoms x 3 cells The cell of the region. If None then non-PBC is assumed, otherwise using PBC. The array should be of size nframes x 9 atom_types The atom types The list should contain natoms ints atomic Calculate the atomic energy and virial fparam The frame parameter. The array can be of size : - nframes x dim_fparam. - dim_fparam. Then all frames are assumed to be provided with the same fparam. aparam The atomic parameter The array can be of size : - nframes x natoms x dim_aparam. - natoms x dim_aparam. Then all frames are assumed to be provided with the same aparam. - dim_aparam. Then all frames and atoms are provided with the same aparam. efield The external field on atoms. The array should be of size nframes x natoms x 3 mixed_type Whether to perform the mixed_type mode. If True, the input data has the mixed_type format (see doc/model/train_se_atten.md), in which frames in a system may have different natoms_vec(s), with the same nloc. Returns ------- energy The system energy. force The force on each atom virial The virial atom_energy The atomic energy. Only returned when atomic == True atom_virial The atomic virial. Only returned when atomic == True """ # reshape coords before getting shape natoms, numb_test = self._get_natoms_and_nframes( coords, atom_types, mixed_type=mixed_type ) output = self._eval_func(self._eval_inner, numb_test, natoms)( coords, cells, atom_types, fparam=fparam, aparam=aparam, atomic=atomic, efield=efield, mixed_type=mixed_type, ) if self.modifier_type is not None: if atomic: raise RuntimeError("modifier does not support atomic modification") me, mf, mv = self.dm.eval(coords, cells, atom_types) output = list(output) # tuple to list e, f, v = output[:3] output[0] += me.reshape(e.shape) output[1] += mf.reshape(f.shape) output[2] += mv.reshape(v.shape) output = tuple(output) return output
def _prepare_feed_dict( self, coords, cells, atom_types, fparam=None, aparam=None, efield=None, mixed_type=False, ): # standarize the shape of inputs natoms, nframes = self._get_natoms_and_nframes( coords, atom_types, mixed_type=mixed_type ) if mixed_type: atom_types = np.array(atom_types, dtype=int).reshape([-1, natoms]) else: atom_types = np.array(atom_types, dtype=int).reshape([-1]) coords = np.reshape(np.array(coords), [nframes, natoms * 3]) if cells is None: pbc = False # make cells to work around the requirement of pbc cells = np.tile(np.eye(3), [nframes, 1]).reshape([nframes, 9]) else: pbc = True cells = np.array(cells).reshape([nframes, 9]) if self.has_fparam: assert fparam is not None fparam = np.array(fparam) if self.has_aparam: assert aparam is not None aparam = np.array(aparam) if self.has_efield: assert ( efield is not None ), "you are using a model with external field, parameter efield should be provided" efield = np.array(efield) # reshape the inputs if self.has_fparam: fdim = self.get_dim_fparam() if fparam.size == nframes * fdim: fparam = np.reshape(fparam, [nframes, fdim]) elif fparam.size == fdim: fparam = np.tile(fparam.reshape([-1]), [nframes, 1]) else: raise RuntimeError( "got wrong size of frame param, should be either %d x %d or %d" % (nframes, fdim, fdim) ) if self.has_aparam: fdim = self.get_dim_aparam() if aparam.size == nframes * natoms * fdim: aparam = np.reshape(aparam, [nframes, natoms * fdim]) elif aparam.size == natoms * fdim: aparam = np.tile(aparam.reshape([-1]), [nframes, 1]) elif aparam.size == fdim: aparam = np.tile(aparam.reshape([-1]), [nframes, natoms]) else: raise RuntimeError( "got wrong size of frame param, should be either %d x %d x %d or %d x %d or %d" % (nframes, natoms, fdim, natoms, fdim, fdim) ) # sort inputs coords, atom_types, imap = self.sort_input( coords, atom_types, mixed_type=mixed_type ) if self.has_efield: efield = np.reshape(efield, [nframes, natoms, 3]) efield = efield[:, imap, :] efield = np.reshape(efield, [nframes, natoms * 3]) if self.has_aparam: aparam = np.reshape(aparam, [nframes, natoms, fdim]) aparam = aparam[:, imap, :] aparam = np.reshape(aparam, [nframes, natoms * fdim]) # make natoms_vec and default_mesh if self.neighbor_list is None: natoms_vec = self.make_natoms_vec(atom_types, mixed_type=mixed_type) assert natoms_vec[0] == natoms mesh = make_default_mesh(pbc, mixed_type) ghost_map = None else: if nframes > 1: raise NotImplementedError( "neighbor_list does not support multiple frames" ) ( natoms_vec, coords, atom_types, mesh, imap, ghost_map, ) = self.build_neighbor_list( coords, cells if cells is not None else None, atom_types, imap, self.neighbor_list, ) # evaluate feed_dict_test = {} feed_dict_test[self.t_natoms] = natoms_vec if mixed_type: feed_dict_test[self.t_type] = atom_types.reshape([-1]) else: feed_dict_test[self.t_type] = np.tile(atom_types, [nframes, 1]).reshape( [-1] ) feed_dict_test[self.t_coord] = np.reshape(coords, [-1]) if len(self.t_box.shape) == 1: feed_dict_test[self.t_box] = np.reshape(cells, [-1]) elif len(self.t_box.shape) == 2: feed_dict_test[self.t_box] = cells else: raise RuntimeError if self.has_efield: feed_dict_test[self.t_efield] = np.reshape(efield, [-1]) feed_dict_test[self.t_mesh] = mesh if self.has_fparam: feed_dict_test[self.t_fparam] = np.reshape(fparam, [-1]) if self.has_aparam: feed_dict_test[self.t_aparam] = np.reshape(aparam, [-1]) return feed_dict_test, imap, natoms_vec, ghost_map def _eval_inner( self, coords, cells, atom_types, fparam=None, aparam=None, atomic=False, efield=None, mixed_type=False, ): natoms, nframes = self._get_natoms_and_nframes( coords, atom_types, mixed_type=mixed_type ) feed_dict_test, imap, natoms_vec, ghost_map = self._prepare_feed_dict( coords, cells, atom_types, fparam, aparam, efield, mixed_type=mixed_type ) nloc = natoms_vec[0] nall = natoms_vec[1] t_out = [self.t_energy, self.t_force, self.t_virial] if atomic: t_out += [self.t_ae, self.t_av] v_out = run_sess(self.sess, t_out, feed_dict=feed_dict_test) energy = v_out[0] force = v_out[1] virial = v_out[2] if atomic: ae = v_out[3] av = v_out[4] if self.has_spin: ntypes_real = self.ntypes - self.ntypes_spin natoms_real = sum( [ np.count_nonzero(np.array(atom_types) == ii) for ii in range(ntypes_real) ] ) else: natoms_real = natoms if ghost_map is not None: # add the value of ghost atoms to real atoms force = np.reshape(force, [nframes, -1, 3]) np.add.at(force[0], ghost_map, force[0, nloc:]) if atomic: av = np.reshape(av, [nframes, -1, 9]) np.add.at(av[0], ghost_map, av[0, nloc:]) # reverse map of the outputs force = self.reverse_map(np.reshape(force, [nframes, -1, 3]), imap) if atomic: ae = self.reverse_map(np.reshape(ae, [nframes, -1, 1]), imap[:natoms_real]) av = self.reverse_map(np.reshape(av, [nframes, -1, 9]), imap) energy = np.reshape(energy, [nframes, 1]) force = np.reshape(force, [nframes, nall, 3]) if nloc < nall: force = force[:, :nloc, :] virial = np.reshape(virial, [nframes, 9]) if atomic: ae = np.reshape(ae, [nframes, natoms_real, 1]) av = np.reshape(av, [nframes, nall, 9]) if nloc < nall: av = av[:, :nloc, :] return energy, force, virial, ae, av else: return energy, force, virial
[docs] def eval_descriptor( self, coords: np.ndarray, cells: np.ndarray, atom_types: List[int], fparam: Optional[np.ndarray] = None, aparam: Optional[np.ndarray] = None, efield: Optional[np.ndarray] = None, mixed_type: bool = False, ) -> np.array: """Evaluate descriptors by using this DP. Parameters ---------- coords The coordinates of atoms. The array should be of size nframes x natoms x 3 cells The cell of the region. If None then non-PBC is assumed, otherwise using PBC. The array should be of size nframes x 9 atom_types The atom types The list should contain natoms ints fparam The frame parameter. The array can be of size : - nframes x dim_fparam. - dim_fparam. Then all frames are assumed to be provided with the same fparam. aparam The atomic parameter The array can be of size : - nframes x natoms x dim_aparam. - natoms x dim_aparam. Then all frames are assumed to be provided with the same aparam. - dim_aparam. Then all frames and atoms are provided with the same aparam. efield The external field on atoms. The array should be of size nframes x natoms x 3 mixed_type Whether to perform the mixed_type mode. If True, the input data has the mixed_type format (see doc/model/train_se_atten.md), in which frames in a system may have different natoms_vec(s), with the same nloc. Returns ------- descriptor Descriptors. """ natoms, numb_test = self._get_natoms_and_nframes( coords, atom_types, mixed_type=mixed_type ) descriptor = self._eval_func(self._eval_descriptor_inner, numb_test, natoms)( coords, cells, atom_types, fparam=fparam, aparam=aparam, efield=efield, mixed_type=mixed_type, ) return descriptor
def _eval_descriptor_inner( self, coords: np.ndarray, cells: np.ndarray, atom_types: List[int], fparam: Optional[np.ndarray] = None, aparam: Optional[np.ndarray] = None, efield: Optional[np.ndarray] = None, mixed_type: bool = False, ) -> np.array: natoms, nframes = self._get_natoms_and_nframes( coords, atom_types, mixed_type=mixed_type ) feed_dict_test, imap, natoms_vec, ghost_map = self._prepare_feed_dict( coords, cells, atom_types, fparam, aparam, efield, mixed_type=mixed_type ) (descriptor,) = run_sess( self.sess, [self.t_descriptor], feed_dict=feed_dict_test ) imap = imap[:natoms] return self.reverse_map(np.reshape(descriptor, [nframes, natoms, -1]), imap)