import os
from typing import List, Optional, TYPE_CHECKING, Union
import numpy as np
from deepmd.common import make_default_mesh
from deepmd.env import default_tf_session_config, tf, MODEL_VERSION
from deepmd.utils.sess import run_sess
from deepmd.utils.batch_size import AutoBatchSize
if TYPE_CHECKING:
from pathlib import Path
[docs]class DeepEval:
"""Common methods for DeepPot, DeepWFC, DeepPolar, ...
Parameters
----------
model_file : Path
The name of the frozen model file.
load_prefix: str
The prefix in the load computational graph
default_tf_graph : bool
If uses the default tf graph, otherwise build a new tf graph for evaluation
auto_batch_size : bool or int or AutomaticBatchSize, default: False
If True, automatic batch size will be used. If int, it will be used
as the initial batch size.
"""
_model_type: Optional[str] = None
_model_version: Optional[str] = None
load_prefix: str # set by subclass
def __init__(
self,
model_file: "Path",
load_prefix: str = "load",
default_tf_graph: bool = False,
auto_batch_size: Union[bool, int, AutoBatchSize] = False,
):
self.graph = self._load_graph(
model_file, prefix=load_prefix, default_tf_graph=default_tf_graph
)
self.load_prefix = load_prefix
# graph_compatable should be called after graph and prefix are set
if not self._graph_compatable():
raise RuntimeError(
f"model in graph (version {self.model_version}) is incompatible"
f"with the model (version {MODEL_VERSION}) supported by the current code."
)
# set default to False, as subclasses may not support
if isinstance(auto_batch_size, bool):
if auto_batch_size:
self.auto_batch_size = AutoBatchSize()
else:
self.auto_batch_size = None
elif isinstance(auto_batch_size, int):
self.auto_batch_size = AutoBatchSize(auto_batch_size)
elif isinstance(auto_batch_size, AutoBatchSize):
self.auto_batch_size = auto_batch_size
else:
raise TypeError("auto_batch_size should be bool, int, or AutoBatchSize")
@property
def model_type(self) -> str:
"""Get type of model.
:type:str
"""
if not self._model_type:
t_mt = self._get_tensor("model_attr/model_type:0")
sess = tf.Session(graph=self.graph, config=default_tf_session_config)
[mt] = run_sess(sess, [t_mt], feed_dict={})
self._model_type = mt.decode("utf-8")
return self._model_type
@property
def model_version(self) -> str:
"""Get version of model.
Returns
-------
str
version of model
"""
if not self._model_version:
try:
t_mt = self._get_tensor("model_attr/model_version:0")
except KeyError:
# For deepmd-kit version 0.x - 1.x, set model version to 0.0
self._model_version = "0.0"
else:
sess = tf.Session(graph=self.graph, config=default_tf_session_config)
[mt] = run_sess(sess, [t_mt], feed_dict={})
self._model_version = mt.decode("utf-8")
return self._model_version
def _graph_compatable(
self
) -> bool :
""" Check the model compatability
Returns
-------
bool
If the model stored in the graph file is compatable with the current code
"""
model_version_major = int(self.model_version.split('.')[0])
model_version_minor = int(self.model_version.split('.')[1])
MODEL_VERSION_MAJOR = int(MODEL_VERSION.split('.')[0])
MODEL_VERSION_MINOR = int(MODEL_VERSION.split('.')[1])
if (model_version_major != MODEL_VERSION_MAJOR) or \
(model_version_minor > MODEL_VERSION_MINOR) :
return False
else:
return True
def _get_tensor(
self, tensor_name: str, attr_name: Optional[str] = None
) -> tf.Tensor:
"""Get TF graph tensor and assign it to class namespace.
Parameters
----------
tensor_name : str
name of tensor to get
attr_name : Optional[str], optional
if specified, class attribute with this name will be created and tensor will
be assigned to it, by default None
Returns
-------
tf.Tensor
loaded tensor
"""
tensor_path = os.path.join(self.load_prefix, tensor_name)
tensor = self.graph.get_tensor_by_name(tensor_path)
if attr_name:
setattr(self, attr_name, tensor)
return tensor
else:
return tensor
@staticmethod
def _load_graph(
frozen_graph_filename: "Path", prefix: str = "load", default_tf_graph: bool = False
):
# We load the protobuf file from the disk and parse it to retrieve the
# unserialized graph_def
with tf.gfile.GFile(str(frozen_graph_filename), "rb") as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
if default_tf_graph:
tf.import_graph_def(
graph_def,
input_map=None,
return_elements=None,
name=prefix,
producer_op_list=None
)
graph = tf.get_default_graph()
else :
# Then, we can use again a convenient built-in function to import
# a graph_def into the current default Graph
with tf.Graph().as_default() as graph:
tf.import_graph_def(
graph_def,
input_map=None,
return_elements=None,
name=prefix,
producer_op_list=None
)
return graph
[docs] @staticmethod
def reverse_map(vec : np.ndarray, imap : List[int]) -> np.ndarray:
"""Reverse mapping of a vector according to the index map
Parameters
----------
vec
Input vector. Be of shape [nframes, natoms, -1]
imap
Index map. Be of shape [natoms]
Returns
-------
vec_out
Reverse mapped vector.
"""
ret = np.zeros(vec.shape)
# for idx,ii in enumerate(imap) :
# ret[:,ii,:] = vec[:,idx,:]
ret[:, imap, :] = vec
return ret
[docs] def make_natoms_vec(self, atom_types : np.ndarray) -> np.ndarray :
"""Make the natom vector used by deepmd-kit.
Parameters
----------
atom_types
The type of atoms
Returns
-------
natoms
The number of atoms. This tensor has the length of Ntypes + 2
natoms[0]: number of local atoms
natoms[1]: total number of atoms held by this processor
natoms[i]: 2 <= i < Ntypes+2, number of type i atoms
"""
natoms_vec = np.zeros (self.ntypes+2).astype(int)
natoms = atom_types.size
natoms_vec[0] = natoms
natoms_vec[1] = natoms
for ii in range (self.ntypes) :
natoms_vec[ii+2] = np.count_nonzero(atom_types == ii)
return natoms_vec