Source code for deepmd.utils.neighbor_stat

# SPDX-License-Identifier: LGPL-3.0-or-later
import logging
import math
from typing import (
    Iterator,
    Optional,
    Tuple,
)

import numpy as np

from deepmd.env import (
    GLOBAL_NP_FLOAT_PRECISION,
    GLOBAL_TF_FLOAT_PRECISION,
    default_tf_session_config,
    tf,
)
from deepmd.utils.batch_size import (
    AutoBatchSize,
)
from deepmd.utils.data_system import (
    DeepmdDataSystem,
)
from deepmd.utils.nlist import (
    extend_coord_with_ghosts,
)
from deepmd.utils.sess import (
    run_sess,
)

log = logging.getLogger(__name__)


[docs]class NeighborStatOP: """Class for getting neighbor statics data information. Parameters ---------- ntypes The num of atom types rcut The cut-off radius distinguish_types : bool, optional If False, treat all types as a single type. """ def __init__( self, ntypes: int, rcut: float, distinguish_types: bool, ) -> None: super().__init__() self.rcut = rcut self.ntypes = ntypes self.distinguish_types = distinguish_types
[docs] def build( self, coord: tf.Tensor, atype: tf.Tensor, cell: tf.Tensor, pbc: tf.Tensor, ) -> Tuple[tf.Tensor, tf.Tensor]: """Calculate the nearest neighbor distance between atoms, maximum nbor size of atoms and the output data range of the environment matrix. Parameters ---------- coord The coordinates of atoms. atype The atom types. cell The cell. Returns ------- tf.Tensor The minimal squared distance between two atoms, in the shape of (nframes,) tf.Tensor The maximal number of neighbors """ # generated by GitHub Copilot, converted from PT codes nframes = tf.shape(coord)[0] coord = tf.reshape(coord, [nframes, -1, 3]) nloc = tf.shape(coord)[1] coord = tf.reshape(coord, [nframes, nloc * 3]) extend_coord, extend_atype, _ = extend_coord_with_ghosts( coord, atype, cell, self.rcut, pbc ) coord1 = tf.reshape(extend_coord, [nframes, -1]) nall = tf.shape(coord1)[1] // 3 coord0 = coord1[:, : nloc * 3] diff = ( tf.reshape(coord1, [nframes, -1, 3])[:, None, :, :] - tf.reshape(coord0, [nframes, -1, 3])[:, :, None, :] ) # shape of diff: nframes, nloc, nall, 3 # remove the diagonal elements mask = tf.eye(nloc, nall, dtype=tf.bool) # expand mask mask = tf.tile(mask[None, :, :], [nframes, 1, 1]) # expand inf inf_mask = tf.constant( float("inf"), dtype=GLOBAL_TF_FLOAT_PRECISION, shape=[1, 1, 1] ) inf_mask = tf.tile(inf_mask, [nframes, nloc, nall]) # virtual type (<0) are not counted virtual_type_mask_i = tf.tile(tf.less(atype, 0)[:, :, None], [1, 1, nall]) virtual_type_mask_j = tf.tile( tf.less(extend_atype, 0)[:, None, :], [1, nloc, 1] ) mask = mask | virtual_type_mask_i | virtual_type_mask_j rr2 = tf.reduce_sum(tf.square(diff), axis=-1) rr2 = tf.where(mask, inf_mask, rr2) min_rr2 = tf.reduce_min(rr2, axis=(1, 2)) # count the number of neighbors if self.distinguish_types: mask = rr2 < self.rcut**2 nnei = [] for ii in range(self.ntypes): nnei.append( tf.reduce_sum( tf.cast( mask & (tf.equal(extend_atype, ii))[:, None, :], tf.int32 ), axis=-1, ) ) # shape: nframes, nloc, ntypes nnei = tf.stack(nnei, axis=-1) else: mask = rr2 < self.rcut**2 # virtual types (<0) are not counted nnei = tf.reshape( tf.reduce_sum( tf.cast( mask & tf.greater_equal(extend_atype, 0)[:, None, :], tf.int32 ), axis=-1, ), [nframes, nloc, 1], ) # nnei: nframes, nloc, ntypes # virtual type i (<0) are not counted nnei = tf.where( tf.tile( tf.less(atype, 0)[:, :, None], [1, 1, self.ntypes if self.distinguish_types else 1], ), tf.zeros_like(nnei, dtype=tf.int32), nnei, ) max_nnei = tf.reduce_max(nnei, axis=1) return min_rr2, max_nnei
[docs]class NeighborStat: """Class for getting training data information. It loads data from DeepmdData object, and measures the data info, including neareest nbor distance between atoms, max nbor size of atoms and the output data range of the environment matrix. Parameters ---------- ntypes The num of atom types rcut The cut-off radius one_type : bool, optional, default=False Treat all types as a single type. """ def __init__( self, ntypes: int, rcut: float, one_type: bool = False, ) -> None: """Constructor.""" self.ntypes = ntypes self.rcut = rcut self.mixed_type = one_type self.auto_batch_size = AutoBatchSize() self.neighbor_stat = NeighborStatOP(ntypes, rcut, not one_type) self.place_holders = {} with tf.Graph().as_default() as sub_graph: self.op = self.build() self.sub_sess = tf.Session(graph=sub_graph, config=default_tf_session_config)
[docs] def get_stat(self, data: DeepmdDataSystem) -> Tuple[float, np.ndarray]: """Get the data statistics of the training data, including nearest nbor distance between atoms, max nbor size of atoms. Parameters ---------- data Class for manipulating many data systems. It is implemented with the help of DeepmdData. Returns ------- min_nbor_dist The nearest distance between neighbor atoms max_nbor_size An array with ntypes integers, denotes the actual achieved max sel """ min_nbor_dist = 100.0 max_nbor_size = np.zeros(1 if self.mixed_type else self.ntypes, dtype=int) for mn, dt, jj in self.iterator(data): if np.isinf(dt): log.warning( "Atoms with no neighbors found in %s. Please make sure it's what you expected." % jj ) if dt < min_nbor_dist: if math.isclose(dt, 0.0, rel_tol=1e-6): # it's unexpected that the distance between two atoms is zero # zero distance will cause nan (#874) raise RuntimeError( "Some atoms are overlapping in %s. Please check your" " training data to remove duplicated atoms." % jj ) min_nbor_dist = dt max_nbor_size = np.maximum(mn, max_nbor_size) # do sqrt in the final min_nbor_dist = math.sqrt(min_nbor_dist) log.info("training data with min nbor dist: " + str(min_nbor_dist)) log.info("training data with max nbor size: " + str(max_nbor_size)) return min_nbor_dist, max_nbor_size
[docs] def build(self) -> Tuple[tf.Tensor, tf.Tensor]: """Build the graph. Returns ------- tf.Tensor The minimal squared distance between two atoms, in the shape of (nframes,) tf.Tensor The maximal number of neighbors """ for ii in ["coord", "box"]: self.place_holders[ii] = tf.placeholder( GLOBAL_NP_FLOAT_PRECISION, [None, None], name="t_" + ii ) self.place_holders["type"] = tf.placeholder( tf.int32, [None, None], name="t_type" ) self.place_holders["pbc"] = tf.placeholder(tf.bool, [], name="t_pbc") ret = self.neighbor_stat.build( self.place_holders["coord"], self.place_holders["type"], self.place_holders["box"], self.place_holders["pbc"], ) return ret
[docs] def iterator( self, data: DeepmdDataSystem ) -> Iterator[Tuple[np.ndarray, float, str]]: """Produce data. Parameters ---------- data The data system Yields ------ np.ndarray The maximal number of neighbors float The squared minimal distance between two atoms str The directory of the data system """ for ii in range(len(data.system_dirs)): for jj in data.data_systems[ii].dirs: data_set = data.data_systems[ii] data_set_data = data_set._load_set(jj) minrr2, max_nnei = self.auto_batch_size.execute_all( self._execute, data_set_data["coord"].shape[0], data_set.get_natoms(), data_set_data["coord"], data_set_data["type"], data_set_data["box"], data_set.pbc, ) yield np.max(max_nnei, axis=0), np.min(minrr2), jj
def _execute( self, coord: np.ndarray, atype: np.ndarray, box: Optional[np.ndarray], pbc: bool, ): """Execute the operation. Parameters ---------- coord The coordinates of atoms. atype The atom types. box The box. pbc Whether the box is periodic. """ feed_dict = { self.place_holders["coord"]: coord, self.place_holders["type"]: atype, self.place_holders["box"]: box, self.place_holders["pbc"]: pbc, } minrr2, max_nnei = run_sess(self.sub_sess, self.op, feed_dict=feed_dict) return minrr2, max_nnei