Source code for deepmd.nvnmd.entrypoints.mapt

# SPDX-License-Identifier: LGPL-3.0-or-later
import logging
from typing import (
    Optional,
)

import numpy as np

from deepmd.env import (
    op_module,
    tf,
)
from deepmd.nvnmd.data.data import (
    jdata_deepmd_input_v0,
    jdata_sys,
)
from deepmd.nvnmd.utils.config import (
    nvnmd_cfg,
)
from deepmd.nvnmd.utils.fio import (
    FioDic,
)
from deepmd.nvnmd.utils.network import (
    get_sess,
)
from deepmd.nvnmd.utils.weight import (
    get_filter_type_weight,
    get_filter_weight,
    get_normalize,
    get_type_embedding_weight,
)
from deepmd.utils.sess import (
    run_sess,
)

log = logging.getLogger(__name__)


[docs]class MapTable: r"""Generate the mapping table describing the relastionship of atomic distance, cutoff function, and embedding matrix. three mapping table will be built: | :math:`r^2_{ji} \rightarrow s_{ji}` | :math:`r^2_{ji} \rightarrow h_{ji}` | :math:`r^2_{ji} \rightarrow \mathcal{G}_{ji}` where :math:`s_{ji}` is cut-off function, :math:`h_{ji} = \frac{s(r_{ji})}{r_{ji}}`, and :math:`\mathcal{G}_{ji}` is embedding matrix. The mapping funciton can be define as: | :math:`y = f(x) = y_{k} + (x - x_{k}) * dy_{k}` | :math:`y_{k} = f(x_{k})` | :math:`dy_{k} = \frac{f(x_{k+1}) - f(x_{k})}{dx}` | :math:`x_{k} \leq x < x_{k+1}` | :math:`x_{k} = k * dx` where :math:`dx` is interpolation interval. Parameters ---------- config_file input file name an .npy file containing the configuration information of NVNMD model weight_file input file name an .npy file containing the weights of NVNMD model map_file output file name an .npy file containing the mapping tables of NVNMD model References ---------- DOI: 10.1038/s41524-022-00773-z """ def __init__(self, config_file: str, weight_file: str, map_file: str): self.config_file = config_file self.weight_file = weight_file self.map_file = map_file jdata = jdata_deepmd_input_v0["nvnmd"] jdata["config_file"] = config_file jdata["weight_file"] = weight_file jdata["enable"] = True # 0 : xyz_scatter = xyz_scatter * two_embd + xyz_scatter; # Gs + 1, Gt + 0 # 1 : xyz_scatter = xyz_scatter * two_embd + two_embd ; # Gs + 0, Gt + 1 self.Gs_Gt_mode = 1 nvnmd_cfg.init_from_jdata(jdata)
[docs] def build_map(self): if self.Gs_Gt_mode == 0: self.shift_Gs = 1 self.shift_Gt = 0 if self.Gs_Gt_mode == 1: self.shift_Gs = 0 self.shift_Gt = 1 # M = nvnmd_cfg.dscp["M1"] if nvnmd_cfg.version == 0: ndim = nvnmd_cfg.dscp["ntype"] if nvnmd_cfg.version == 1: ndim = 1 # calculate grid point dic_u2s, dic_u2s_ref = self.run_u2s() dic_s2g, dic_s2g_ref = self.run_s2g() if nvnmd_cfg.version == 1: dic_t2g = self.run_t2g() dic_std = self.build_davg_dstd() # build mapping table dic_map1 = {} ## u2s u = np.reshape(dic_u2s["u"], [-1]) cfg_u2s = [[u[0], u[512], u[1] - u[0], 0, 512]] dic_map1["s"], dic_map1["s_grad"] = self.build_map_coef( cfg_u2s, u, dic_u2s["s"], dic_u2s["s_grad"], dic_u2s["s_grad_grad"], ndim, 1, ) dic_map1["h"], dic_map1["h_grad"] = self.build_map_coef( cfg_u2s, u, dic_u2s["h"], dic_u2s["h_grad"], dic_u2s["h_grad_grad"], ndim, 1, ) ## s2g dic_map2 = {} s = np.reshape(dic_s2g["s"], [-1]) cfg_s2g = [ [s[0], s[256], s[1] - s[0], 0, 256], [s[0], s[4096], s[16] - s[0], 256, 512], ] dic_map2["g"], dic_map2["g_grad"] = self.build_map_coef( cfg_s2g, s, dic_s2g["g"], dic_s2g["g_grad"], dic_s2g["g_grad_grad"], ndim, M, ) if nvnmd_cfg.version == 1: ## t2g dic_map3 = {} dic_map3["t_ebd"] = dic_t2g["t_ebd"] dic_map3["gt"] = dic_t2g["gt"] ## davg and dstd dic_map4 = {} dic_map4["davg_opp"] = dic_std["davg_opp"] dic_map4["dstd_inv"] = dic_std["dstd_inv"] # run mapping to test if jdata_sys["debug"]: dic_u2s_prd = self.mapping2(dic_u2s_ref["u"], dic_map1, cfg_u2s) dic_s2g_prd = self.mapping2(dic_s2g_ref["s"], dic_map2, cfg_s2g) self.plot_lines(dic_u2s_ref["u"], dic_u2s_prd, dic_u2s_ref) self.plot_lines(dic_s2g_ref["s"], dic_s2g_prd, dic_s2g_ref) # save self.map = {} self.map["cfg_u2s"] = cfg_u2s self.map["cfg_s2g"] = cfg_s2g self.map.update(dic_map1) self.map.update(dic_map2) if nvnmd_cfg.version == 1: self.map.update(dic_map3) self.map.update(dic_map4) # FioDic().save(self.map_file, self.map) log.info("NVNMD: finish building mapping table") return self.map
[docs] def mapping(self, x, dic_map, cfgs): r"""Evaluate value by mapping table operation of tensorflow.""" n = len(x) dic_val = {} for key in dic_map.keys(): val = dic_map[key] if isinstance(val, list): dats = [] for ii in range(len(val)): val_i = val[ii] nr = np.shape(val_i)[0] nc = np.shape(val_i)[1] // 4 dat_i = np.zeros([n, nc]) for kk in range(n): xk = x[kk] for cfg in cfgs: x0, x1, dx, N0, N1 = cfg if (xk >= x0) and (xk <= x1): break idx = np.int32(np.floor((xk - x0) / dx)) dxx = xk - idx * dx - x0 idx_k = idx + N0 dxx_k = dxx if idx_k >= N1: idx_k = N1 - 1 dxx_k = dx coef = val_i[idx_k] coef = np.reshape(coef, [nc, 4]) a, b, c, d = coef[:, 0], coef[:, 1], coef[:, 2], coef[:, 3] dat_i[kk, :] = d + (c + (b + a * dxx_k) * dxx_k) * dxx_k dats.append(dat_i) dic_val[key] = dats return dic_val
[docs] def mapping2(self, x, dic_map, cfgs): r"""Evaluate value by mapping table of numpy.""" tf.reset_default_graph() t_x = tf.placeholder(tf.float64, [None, 1], "t_x") t_table = tf.placeholder(tf.float64, [None, None], "t_table") t_table_grad = tf.placeholder(tf.float64, [None, None], "t_table_grad") t_table_info = tf.placeholder(tf.float64, [None], "t_table_info") t_y = op_module.map_flt_nvnmd(t_x, t_table, t_table_grad, t_table_info) sess = get_sess() # n = len(x) dic_val = {} for key in dic_map.keys(): val = dic_map[key] is_list = isinstance(val, list) if not is_list: val = [val] dats = [] for ii in range(len(val)): val_i = val[ii] feed_dict = { t_x: x, t_table: val_i, t_table_grad: val_i * 0.0, t_table_info: np.reshape(np.array(cfgs), [-1]), } dat_i = run_sess(sess, t_y, feed_dict=feed_dict) dat_i = np.reshape(dat_i, [n, -1]) dats.append(dat_i) if not is_list: dats = dats[0] dic_val[key] = dats return dic_val
[docs] def plot_lines(self, x, dic1, dic2=None): r"""Plot lines to see accuracy.""" pass
[docs] def build_map_coef(self, cfgs, x, ys, grads, grad_grads, Nr, Nc): r"""Build mapping table coefficient cfgs: cfg list cfg = x0, x1, dx. coef4: a x^3 + b x^2 + c x + d = y: / d = y0 | c = y0' | b = (3 y1 - dx dy' - 2dx y0' - 3y0) / dx^2 \ a = (dx y1' - 2 y1 + dx y0' + 2 y0) / dx^3 """ coefs = [] coef_grads = [] is_list = isinstance(ys, list) for ii in range(Nr): if is_list: y_i = ys[ii] grad_i = grads[ii] grad_grad_i = grad_grads[ii] else: y_i = ys grad_i = grads grad_grad_i = grad_grads # coef_i = [] coef_grad_i = [] for jj in range(Nc): y_ij = y_i[:, jj] grad_ij = grad_i[:, jj] grad_grad_ij = grad_grad_i[:, jj] coef_ij = self.cal_coef4(cfgs, x, y_ij, grad_ij) coef_grad_ij = self.cal_coef4(cfgs, x, grad_ij, grad_grad_ij) coef_i.append(coef_ij) coef_grad_i.append(coef_grad_ij) coef_i = np.concatenate(coef_i, axis=1) coef_grad_i = np.concatenate(coef_grad_i, axis=1) coefs.append(coef_i) coef_grads.append(coef_grad_i) if not is_list: coefs = coefs[0] coef_grads = coef_grads[0] return coefs, coef_grads
[docs] def cal_coef4(self, cfgs, x, y, dy): r"""Build mapping table coefficient for one line coef4: a x^3 + b x^2 + c x + d = y: / d = y0 | c = y0' | b = (3 y1 - dx dy' - 2dx y0' - 3y0) / dx^2 \ a = (dx y1' - 2 y1 + dx y0' + 2 y0) / dx^3. """ x = np.reshape(x, [-1]) coefs = [] for cfg in cfgs: x0, x1, dx, N0, N1 = cfg Nd = N1 - N0 diff_x = np.abs((x - x0) - np.round((x - x0) / dx) * dx) idx = np.logical_and(np.logical_and(x >= x0, x <= x1), diff_x < 1.0e-6) y0 = y[idx][:-1] y1 = y[idx][1:] dy0 = dy[idx][:-1] dy1 = dy[idx][1:] y0 = y0[:Nd] y1 = y1[:Nd] dy0 = dy0[:Nd] dy1 = dy1[:Nd] # a = (dx * dy1 - 2 * y1 + dx * dy0 + 2 * y0) / dx**3 b = (3 * y1 - dx * dy1 - 2 * dx * dy0 - 3 * y0) / dx**2 c = dy0 d = y0 coef = np.concatenate([a, b, c, d]) coef = np.transpose(np.reshape(coef, [4, -1])) coefs.append(coef) coefs = np.concatenate(coefs) return coefs
[docs] def build_grad(self, x, y, Nr, Nc): r""": Build gradient of tensor y of x.""" is_list = isinstance(y, list) grads = [] grad_grads = [] for ii in range(Nr): y_i = y[ii] if is_list else y grad_i = [] grad_grad_i = [] for jj in range(Nc): y_ij = y_i[:, jj] grad_ij = tf.gradients(y_ij, x)[0] grad_grad_ij = tf.gradients(grad_ij, x)[0] grad_i.append(grad_ij) grad_grad_i.append(grad_grad_ij) grad_i = tf.concat(grad_i, axis=1) grad_grad_i = tf.concat(grad_grad_i, axis=1) grads.append(grad_i) grad_grads.append(grad_grad_i) if not is_list: grads = grads[0] grad_grads = grad_grads[0] return grads, grad_grads
[docs] def build_u2s(self, r2): r"""Build tensor s, s=s(r2).""" rmin = nvnmd_cfg.dscp["rcut_smth"] rmax = nvnmd_cfg.dscp["rcut"] dmin = nvnmd_cfg.dscp["dmin"] min_dist = rmin if "train_attr.min_nbor_dist" in nvnmd_cfg.weight.keys(): min_dist = nvnmd_cfg.weight["train_attr.min_nbor_dist"] if dmin > 1e-6: min_dist = dmin min_dist = 0.5 if (min_dist > 0.5) else (min_dist - 0.1) # r = tf.sqrt(r2) r_ = tf.clip_by_value(r, rmin, rmax) r__ = tf.clip_by_value(r, min_dist, rmax) uu = (r_ - rmin) / (rmax - rmin) vv = uu * uu * uu * (-6 * uu * uu + 15 * uu - 10) + 1 if nvnmd_cfg.version == 0: ntype = nvnmd_cfg.dscp["ntype"] avg, std = get_normalize(nvnmd_cfg.weight) avg, std = np.float64(avg), np.float64(std) sl, hl = [], [] for tt in range(ntype): s = vv / r__ h = s / r__ s = tf.reshape(s, [-1, 1]) h = tf.reshape(h, [-1, 1]) s = (s - avg[tt, 0]) / std[tt, 0] h = h / std[tt, 1] sl.append(s) hl.append(h) return sl, hl if nvnmd_cfg.version == 1: s = vv / r__ h = s / r__ s = tf.reshape(s, [-1, 1]) h = tf.reshape(h, [-1, 1]) return [s], [h]
[docs] def build_u2s_grad(self): r"""Build gradient of s with respect to u (r^2).""" if nvnmd_cfg.version == 0: ndim = nvnmd_cfg.dscp["ntype"] if nvnmd_cfg.version == 1: ndim = 1 # dic_ph = {} dic_ph["u"] = tf.placeholder(tf.float64, [None, 1], "t_u") dic_ph["s"], dic_ph["h"] = self.build_u2s(dic_ph["u"]) dic_ph["s_grad"], dic_ph["s_grad_grad"] = self.build_grad( dic_ph["u"], dic_ph["s"], ndim, 1 ) dic_ph["h_grad"], dic_ph["h_grad_grad"] = self.build_grad( dic_ph["u"], dic_ph["h"], ndim, 1 ) return dic_ph
[docs] def run_u2s(self): r"""Build u->s graph and run it to get value of mapping table.""" ntype = nvnmd_cfg.dscp["ntype"] if nvnmd_cfg.version == 0: ndim = nvnmd_cfg.dscp["ntype"] if nvnmd_cfg.version == 1: ndim = 1 avg, std = get_normalize(nvnmd_cfg.weight) avg, std = np.float64(avg), np.float64(std) rc_max = nvnmd_cfg.dscp["rc_max"] tf.reset_default_graph() dic_ph = self.build_u2s_grad() sess = get_sess() # N = NUM_MAPT N = 512 N2 = int(rc_max**2) # N+1 ranther than N for calculating defference keys = list(dic_ph.keys()) vals = list(dic_ph.values()) u = N2 * np.reshape(np.arange(0, N + 1) / N, [-1, 1]) res_lst = run_sess(sess, vals, feed_dict={dic_ph["u"]: u}) res_dic = dict(zip(keys, res_lst)) u2 = N2 * np.reshape(np.arange(0, N * 16 + 1) / (N * 16), [-1, 1]) res_lst2 = run_sess(sess, vals, feed_dict={dic_ph["u"]: u2}) res_dic2 = dict(zip(keys, res_lst2)) # reference for commpare # change value for tt in range(ndim): res_dic["s"][tt][0] = -avg[tt, 0] / std[tt, 0] res_dic["s_grad"][tt][0] = 0 res_dic["s_grad_grad"][tt][0] = 0 res_dic["h"][tt][0] = 0 res_dic["h_grad"][tt][0] = 0 res_dic["h_grad_grad"][tt][0] = 0 # res_dic2["s"][tt][0] = -avg[tt, 0] / std[tt, 0] res_dic2["s_grad"][tt][0] = 0 res_dic2["s_grad_grad"][tt][0] = 0 res_dic2["h"][tt][0] = 0 res_dic2["h_grad"][tt][0] = 0 res_dic2["h_grad_grad"][tt][0] = 0 sess.close() return res_dic, res_dic2
[docs] def build_s2g(self, s): r"""Build s->G s is switch function G is embedding net output. """ if nvnmd_cfg.version == 0: ntype = nvnmd_cfg.dscp["ntype"] if nvnmd_cfg.version == 1: ntype = 1 # xyz_scatters = [] for tt2 in range(ntype): wbs = [get_filter_weight(nvnmd_cfg.weight, tt2, ll) for ll in range(1, 5)] xyz_scatter = self.build_embedding_net(s, wbs) xyz_scatters.append(xyz_scatter) return xyz_scatters
[docs] def build_s2g_grad(self): r"""Build gradient of G with respect to s.""" M1 = nvnmd_cfg.dscp["M1"] # if nvnmd_cfg.version == 0: ntypex = nvnmd_cfg.dscp["ntypex"] ntype = nvnmd_cfg.dscp["ntype"] ndim = ntypex * ntype shift = 0 if nvnmd_cfg.version == 1: ndim = 1 shift = self.shift_Gs # dic_ph = {} dic_ph["s"] = tf.placeholder(tf.float64, [None, 1], "t_s") dic_ph["g"] = [g + shift for g in self.build_s2g(dic_ph["s"])] dic_ph["g_grad"], dic_ph["g_grad_grad"] = self.build_grad( dic_ph["s"], dic_ph["g"], ndim, M1 ) return dic_ph
[docs] def run_s2g(self): r"""Build s-> graph and run it to get value of mapping table.""" smin = nvnmd_cfg.dscp["smin"] smax = nvnmd_cfg.dscp["smax"] # fix the bug: if model initial mode is 'init_from_model', # we need dmin to calculate smin and smax in mapt.py if smin == -2: davg, dstd = get_normalize(nvnmd_cfg.weight) nvnmd_cfg.get_s_range(davg, dstd) smin = nvnmd_cfg.dscp["smin"] smax = nvnmd_cfg.dscp["smax"] tf.reset_default_graph() dic_ph = self.build_s2g_grad() sess = get_sess() N = 4096 N2 = 16 log.info(f"the range of s is [{smin}, {smax}]") # check if (smax - smin) > 16.0: log.warning("the range of s is over the limit (smax - smin) > 16.0") prec = N / N2 # the lower limit of switch function if nvnmd_cfg.version == 0: smin_ = np.floor(smin * prec - 1) / prec if nvnmd_cfg.version == 1: smin_ = 0 # keys = list(dic_ph.keys()) vals = list(dic_ph.values()) s = N2 * np.reshape(np.arange(0, N + 1) / N, [-1, 1]) + smin_ res_lst = run_sess(sess, vals, feed_dict={dic_ph["s"]: s}) res_dic = dict(zip(keys, res_lst)) s2 = N2 * np.reshape(np.arange(0, N * 16 + 1) / (N * 16), [-1, 1]) + smin_ res_lst2 = run_sess(sess, vals, feed_dict={dic_ph["s"]: s2}) res_dic2 = dict(zip(keys, res_lst2)) sess.close() return res_dic, res_dic2
[docs] def build_t2g(self): r"""Build t->G t is chemical species of center atom and neighbor atom G is embedding net output of type. """ ntype = nvnmd_cfg.dscp["ntype"] filter_precision = tf.float64 types = tf.convert_to_tensor(list(range(ntype)), dtype=tf.int32) ebd_type = tf.cast( tf.one_hot(tf.cast(types, dtype=tf.int32), int(ntype)), filter_precision, ) ebd_type = tf.reshape(ebd_type, [-1, ntype]) # type -> type_embedding dic_ph = {} dic_ph["t_one_hot"] = ebd_type wbs = [get_type_embedding_weight(nvnmd_cfg.weight, ll) for ll in range(1, 5)] ebd_type = self.build_embedding_net(dic_ph["t_one_hot"], wbs, None) last_type = tf.cast(tf.zeros([1, ebd_type.shape[1]]), filter_precision) ebd_type = tf.concat([ebd_type, last_type], 0) dic_ph["t_ebd"] = ebd_type # type_embedding of i, j atoms -> two_side_type_embedding type_embedding = dic_ph["t_ebd"] padding_ntypes = type_embedding.shape[0] type_embedding_nei = tf.tile( tf.reshape(type_embedding, [1, padding_ntypes, -1]), [padding_ntypes, 1, 1], ) # (ntypes) * ntypes * Y type_embedding_center = tf.tile( tf.reshape(type_embedding, [padding_ntypes, 1, -1]), [1, padding_ntypes, 1], ) # ntypes * (ntypes) * Y two_side_type_embedding = tf.concat( [type_embedding_nei, type_embedding_center], -1 ) # ntypes * ntypes * (Y+Y) two_side_type_embedding = tf.reshape( two_side_type_embedding, [-1, two_side_type_embedding.shape[-1]], ) # see se_atten.py in dp wbs = [get_filter_type_weight(nvnmd_cfg.weight, ll) for ll in range(1, 5)] dic_ph["gt"] = ( self.build_embedding_net(two_side_type_embedding, wbs) + self.shift_Gt ) return dic_ph
[docs] def run_t2g(self): r"""Build t-> graph and run it to get value of mapping table.""" tf.reset_default_graph() dic_ph = self.build_t2g() sess = get_sess() # keys = list(dic_ph.keys()) vals = list(dic_ph.values()) # res_lst = run_sess(sess, vals, feed_dict={}) res_dic = dict(zip(keys, res_lst)) sess.close() return res_dic
[docs] def build_embedding_net(self, xx, wbs, activation_fn=tf.tanh): for ll in range(len(wbs)): # weight and bias w, b, t = wbs[ll] if (w is None) or (b is None): break w, b = np.float64(w), np.float64(b) # layer if activation_fn is None: hidden = tf.matmul(xx, w) + b else: hidden = activation_fn(tf.matmul(xx, w) + b) # resnet shw = w.shape if shw[1] == shw[0]: if t is None: xx += hidden else: xx += hidden * t elif shw[1] == shw[0] * 2: if t is None: xx = tf.concat([xx, xx], 1) + hidden else: xx = tf.concat([xx, xx], 1) + hidden * t else: xx = hidden return xx
[docs] def build_davg_dstd(self): ntype = nvnmd_cfg.dscp["ntype"] davg, dstd = get_normalize(nvnmd_cfg.weight) # res_dic = {} res_dic["davg_opp"] = np.array([-davg[tt, 0:4] for tt in range(ntype)]) res_dic["dstd_inv"] = np.array([1.0 / dstd[tt, 0:4] for tt in range(ntype)]) return res_dic
[docs]def mapt( *, nvnmd_config: Optional[str] = "nvnmd/config.npy", nvnmd_weight: Optional[str] = "nvnmd/weight.npy", nvnmd_map: Optional[str] = "nvnmd/map.npy", **kwargs, ): # build mapping table mapObj = MapTable(nvnmd_config, nvnmd_weight, nvnmd_map) mapObj.build_map()