import re
import math
import logging
import numpy as np
from typing import Callable
from typing import Tuple, List
from deepmd.env import tf
from deepmd.env import op_module
from deepmd.common import ACTIVATION_FN_DICT
from deepmd.utils.sess import run_sess
from deepmd.utils.graph import get_tensor_by_name_from_graph, load_graph_def
from deepmd.utils.graph import get_embedding_net_nodes_from_graph_def
from tensorflow.python.platform import gfile
from tensorflow.python.framework import tensor_util
log = logging.getLogger(__name__)
[docs]class DPTabulate():
"""
Class for tabulation.
Compress a model, which including tabulating the embedding-net.
The table is composed of fifth-order polynomial coefficients and is assembled from two sub-tables. The first table takes the stride(parameter) as it\'s uniform stride, while the second table takes 10 * stride as it\s uniform stride
The range of the first table is automatically detected by deepmd-kit, while the second table ranges from the first table\'s upper boundary(upper) to the extrapolate(parameter) * upper.
Parameters
----------
model_file
The frozen model
type_one_side
Try to build N_types tables. Otherwise, building N_types^2 tables
exclude_types : List[List[int]]
The excluded pairs of types which have no interaction with each other.
For example, `[[0, 1]]` means no interaction between type 0 and type 1.
activation_function
The activation function in the embedding net. Supported options are {"tanh","gelu"} in common.ACTIVATION_FN_DICT.
suffix : str, optional
The suffix of the scope
"""
def __init__(self,
model_file : str,
type_one_side : bool = False,
exclude_types : List[List[int]] = [],
activation_fn : Callable[[tf.Tensor], tf.Tensor] = tf.nn.tanh,
suffix : str = "",
) -> None:
"""
Constructor
"""
self.model_file = model_file
self.type_one_side = type_one_side
self.exclude_types = exclude_types
self.suffix = suffix
if self.type_one_side and len(self.exclude_types) != 0:
raise RuntimeError('"type_one_side" is not compatible with "exclude_types"')
# functype
if activation_fn == ACTIVATION_FN_DICT["tanh"]:
self.functype = 1
elif activation_fn == ACTIVATION_FN_DICT["gelu"]:
self.functype = 2
else:
raise RuntimeError("Unknown actication function type!")
self.activation_fn = activation_fn
self.graph, self.graph_def = load_graph_def(self.model_file)
self.sess = tf.Session(graph = self.graph)
self.sub_graph, self.sub_graph_def = self._load_sub_graph()
self.sub_sess = tf.Session(graph = self.sub_graph)
try:
self.sel_a = self.graph.get_operation_by_name('ProdEnvMatA').get_attr('sel_a')
self.descrpt = self.graph.get_operation_by_name ('ProdEnvMatA')
except Exception:
self.sel_a = self.graph.get_operation_by_name('DescrptSeA').get_attr('sel_a')
self.descrpt = self.graph.get_operation_by_name ('DescrptSeA')
self.davg = get_tensor_by_name_from_graph(self.graph, f'descrpt_attr{self.suffix}/t_avg')
self.dstd = get_tensor_by_name_from_graph(self.graph, f'descrpt_attr{self.suffix}/t_std')
self.ntypes = get_tensor_by_name_from_graph(self.graph, 'descrpt_attr/ntypes')
self.rcut = self.descrpt.get_attr('rcut_r')
self.rcut_smth = self.descrpt.get_attr('rcut_r_smth')
self.embedding_net_nodes = get_embedding_net_nodes_from_graph_def(self.graph_def, suffix=self.suffix)
for tt in self.exclude_types:
if (tt[0] not in range(self.ntypes)) or (tt[1] not in range(self.ntypes)):
raise RuntimeError("exclude types" + str(tt) + " must within the number of atomic types " + str(self.ntypes) + "!")
if (self.ntypes * self.ntypes - len(self.exclude_types) == 0):
raise RuntimeError("empty embedding-net are not supported in model compression!")
self.layer_size = len(self.embedding_net_nodes) // ((self.ntypes * self.ntypes - len(self.exclude_types)) * 2)
self.table_size = self.ntypes * self.ntypes
if type_one_side :
self.layer_size = len(self.embedding_net_nodes) // (self.ntypes * 2)
self.table_size = self.ntypes
# self.value_type = self.embedding_net_nodes["filter_type_0/matrix_1_0"].dtype #"filter_type_0/matrix_1_0" must exit~
# get trained variables
self.bias = self._get_bias()
self.matrix = self._get_matrix()
for item in self.matrix["layer_" + str(self.layer_size)]:
if len(item) != 0:
self.data_type = type(item[0][0])
self.last_layer_size = item.shape[1]
# define tables
self.data = {}
[docs] def build(self,
min_nbor_dist : float,
extrapolate : float,
stride0 : float,
stride1 : float) -> Tuple[int, int]:
"""
Build the tables for model compression
Parameters
----------
min_nbor_dist
The nearest distance between neighbor atoms
extrapolate
The scale of model extrapolation
stride0
The uniform stride of the first table
stride1
The uniform stride of the second table
Returns
----------
lower
The lower boundary of environment matrix
upper
The upper boundary of environment matrix
"""
# tabulate range [lower, upper] with stride0 'stride0'
lower, upper = self._get_env_mat_range(min_nbor_dist)
xx = np.arange(lower, upper, stride0, dtype = self.data_type)
xx = np.append(xx, np.arange(upper, extrapolate * upper, stride1, dtype = self.data_type))
xx = np.append(xx, np.array([extrapolate * upper], dtype = self.data_type))
self.nspline = int((upper - lower) / stride0 + (extrapolate * upper - upper) / stride1)
for ii in range(self.table_size):
if self.type_one_side or (ii // self.ntypes, int(ii % self.ntypes)) not in self.exclude_types:
vv, dd, d2 = self._make_data(xx, ii)
if self.type_one_side:
net = "filter_-1_net_" + str(ii)
else:
net = "filter_" + str(ii // self.ntypes) + "_net_" + str(int(ii % self.ntypes))
self.data[net] = np.zeros([self.nspline, 6 * self.last_layer_size], dtype = self.data_type)
# for jj in tqdm(range(self.nspline), desc = 'DEEPMD INFO |-> deepmd.utils.tabulate\t\t\t' + net + ', tabulating'):
for jj in range(self.nspline):
for kk in range(self.last_layer_size):
if jj < int((upper - lower) / stride0):
tt = stride0
else:
tt = stride1
hh = vv[jj + 1][kk] - vv[jj][kk]
self.data[net][jj][kk * 6 + 0] = vv[jj][kk]
self.data[net][jj][kk * 6 + 1] = dd[jj][kk]
self.data[net][jj][kk * 6 + 2] = 0.5 * d2[jj][kk]
self.data[net][jj][kk * 6 + 3] = (1 / (2 * tt * tt * tt)) * (20 * hh - (8 * dd[jj + 1][kk] + 12 * dd[jj][kk]) * tt - (3 * d2[jj][kk] - d2[jj + 1][kk]) * tt * tt)
self.data[net][jj][kk * 6 + 4] = (1 / (2 * tt * tt * tt * tt)) * (-30 * hh + (14 * dd[jj + 1][kk] + 16 * dd[jj][kk]) * tt + (3 * d2[jj][kk] - 2 * d2[jj + 1][kk]) * tt * tt)
self.data[net][jj][kk * 6 + 5] = (1 / (2 * tt * tt * tt * tt * tt)) * (12 * hh - 6 * (dd[jj + 1][kk] + dd[jj][kk]) * tt + (d2[jj + 1][kk] - d2[jj][kk]) * tt * tt)
return lower, upper
def _load_sub_graph(self):
sub_graph_def = tf.GraphDef()
with tf.Graph().as_default() as sub_graph:
tf.import_graph_def(sub_graph_def, name = "")
return sub_graph, sub_graph_def
def _get_bias(self):
bias = {}
for layer in range(1, self.layer_size + 1):
bias["layer_" + str(layer)] = []
if self.type_one_side:
for ii in range(0, self.ntypes):
node = self.embedding_net_nodes[f"filter_type_all{self.suffix}/bias_{layer}_{ii}"]
tensor_value = np.frombuffer (node.tensor_content, dtype = tf.as_dtype(node.dtype).as_numpy_dtype)
tensor_shape = tf.TensorShape(node.tensor_shape).as_list()
bias["layer_" + str(layer)].append(np.reshape(tensor_value, tensor_shape))
else:
for ii in range(0, self.ntypes * self.ntypes):
if (ii // self.ntypes, int(ii % self.ntypes)) not in self.exclude_types:
node = self.embedding_net_nodes[f"filter_type_{ii // self.ntypes}{self.suffix}/bias_{layer}_{ii % self.ntypes}"]
tensor_value = np.frombuffer(node.tensor_content, dtype = tf.as_dtype(node.dtype).as_numpy_dtype)
tensor_shape = tf.TensorShape(node.tensor_shape).as_list()
bias["layer_" + str(layer)].append(np.reshape(tensor_value, tensor_shape))
else:
bias["layer_" + str(layer)].append(np.array([]))
return bias
def _get_matrix(self):
matrix = {}
for layer in range(1, self.layer_size + 1):
matrix["layer_" + str(layer)] = []
if self.type_one_side:
for ii in range(0, self.ntypes):
node = self.embedding_net_nodes[f"filter_type_all{self.suffix}/matrix_{layer}_{ii}"]
tensor_value = np.frombuffer (node.tensor_content, dtype = tf.as_dtype(node.dtype).as_numpy_dtype)
tensor_shape = tf.TensorShape(node.tensor_shape).as_list()
matrix["layer_" + str(layer)].append(np.reshape(tensor_value, tensor_shape))
else:
for ii in range(0, self.ntypes * self.ntypes):
if (ii // self.ntypes, int(ii % self.ntypes)) not in self.exclude_types:
node = self.embedding_net_nodes[f"filter_type_{ii // self.ntypes}{self.suffix}/matrix_{layer}_{ii % self.ntypes}"]
tensor_value = np.frombuffer(node.tensor_content, dtype = tf.as_dtype(node.dtype).as_numpy_dtype)
tensor_shape = tf.TensorShape(node.tensor_shape).as_list()
matrix["layer_" + str(layer)].append(np.reshape(tensor_value, tensor_shape))
else:
matrix["layer_" + str(layer)].append(np.array([]))
return matrix
# one-by-one executions
def _make_data(self, xx, idx):
with self.sub_graph.as_default():
with self.sub_sess.as_default():
xx = tf.reshape(xx, [xx.size, -1])
for layer in range(self.layer_size):
if layer == 0:
xbar = tf.matmul(
xx, self.matrix["layer_" + str(layer + 1)][idx]) + self.bias["layer_" + str(layer + 1)][idx]
yy = self._layer_0(
xx, self.matrix["layer_" + str(layer + 1)][idx], self.bias["layer_" + str(layer + 1)][idx])
dy = op_module.unaggregated_dy_dx_s(
yy, self.matrix["layer_" + str(layer + 1)][idx], xbar, tf.constant(self.functype))
dy2 = op_module.unaggregated_dy2_dx_s(
yy, dy, self.matrix["layer_" + str(layer + 1)][idx], xbar, tf.constant(self.functype))
else:
ybar = tf.matmul(
yy, self.matrix["layer_" + str(layer + 1)][idx]) + self.bias["layer_" + str(layer + 1)][idx]
tt, zz = self._layer_1(
yy, self.matrix["layer_" + str(layer + 1)][idx], self.bias["layer_" + str(layer + 1)][idx])
dz = op_module.unaggregated_dy_dx(
zz - tt, self.matrix["layer_" + str(layer + 1)][idx], dy, ybar, tf.constant(self.functype))
dy2 = op_module.unaggregated_dy2_dx(
zz - tt, self.matrix["layer_" + str(layer + 1)][idx], dy, dy2, ybar, tf.constant(self.functype))
dy = dz
yy = zz
vv = zz.eval()
dd = dy.eval()
d2 = dy2.eval()
return vv, dd, d2
def _layer_0(self, x, w, b):
return self.activation_fn(tf.matmul(x, w) + b)
def _layer_1(self, x, w, b):
t = tf.concat([x, x], axis=1)
return t, self.activation_fn(tf.matmul(x, w) + b) + t
def _save_data(self):
for ii in range(self.ntypes * self.ntypes):
net = "filter_" + str(ii // self.ntypes) + "_net_" + str(int(ii % self.ntypes))
np.savetxt('data_' + str(ii), self.data[net])
def _get_env_mat_range(self,
min_nbor_dist):
lower = 100.0
upper = -10.0
sw = self._spline5_switch(min_nbor_dist, self.rcut_smth, self.rcut)
for ii in range(self.ntypes):
if lower > -self.davg[ii][0] / self.dstd[ii][0]:
lower = -self.davg[ii][0] / self.dstd[ii][0]
if upper < ((1 / min_nbor_dist) * sw - self.davg[ii][0]) / self.dstd[ii][0]:
upper = ((1 / min_nbor_dist) * sw - self.davg[ii][0]) / self.dstd[ii][0]
log.info('training data with lower boundary: ' + str(lower))
log.info('training data with upper boundary: ' + str(upper))
return math.floor(lower), math.ceil(upper)
def _spline5_switch(self,
xx,
rmin,
rmax):
if xx < rmin:
vv = 1
elif xx < rmax:
uu = (xx - rmin) / (rmax - rmin)
vv = uu*uu*uu * (-6 * uu*uu + 15 * uu - 10) + 1
else:
vv = 0
return vv