deepmd.tf.utils

Contents

deepmd.tf.utils#

Submodules#

Classes#

DeepmdData

Class for a data system.

DeepmdDataSystem

Class for manipulating many data systems.

LearningRateSchedule

TensorFlow wrapper for BaseLR.

PairTab

Pairwise tabulated potential.

Plugin

A class to register and restore plugins.

PluginVariant

A class to remove type from input arguments.

Package Contents#

class deepmd.tf.utils.DeepmdData(sys_path: str, set_prefix: str = 'set', shuffle_test: bool = True, type_map: list[str] | None = None, optional_type_map: bool = True, modifier: Any | None = None, trn_all_set: bool = False, sort_atoms: bool = True)[source]#

Class for a data system.

It loads data from hard disk, and maintains the data as a data_dict

Parameters:
sys_path

Path to the data system

set_prefix

Prefix for the directories of different sets

shuffle_test

If the test data are shuffled

type_map

Gives the name of different atom types

optional_type_map

If the type_map.raw in each system is optional

modifier

Data modifier that has the method modify_data

trn_all_set

[DEPRECATED] Deprecated. Now all sets are trained and tested.

sort_atomsbool

Sort atoms by atom types. Required to enable when the data is directly fed to descriptors except mixed types.

dirs#
mixed_type#
atom_type#
natoms#
type_map#
pbc = True#
enforce_type_map = False#
sort_atoms = True#
idx_map#
data_dict#
set_count = 0#
iterator = 0#
shuffle_test = True#
modifier = None#
nframes#
prefix_sum#
use_modifier_cache = True#
add(key: str, ndof: int, atomic: bool = False, must: bool = False, high_prec: bool = False, type_sel: list[int] | None = None, repeat: int = 1, default: float = 0.0, dtype: numpy.dtype | None = None, output_natoms_for_type_sel: bool = False) DeepmdData[source]#

Add a data item that to be loaded.

Parameters:
key

The key of the item. The corresponding data is stored in sys_path/set.*/key.npy

ndof

The number of dof

atomic

The item is an atomic property. If False, the size of the data should be nframes x ndof If True, the size of data should be nframes x natoms x ndof

must

The data file sys_path/set.*/key.npy must exist. If must is False and the data file does not exist, the data_dict[find_key] is set to 0.0

high_prec

Load the data and store in float64, otherwise in float32

type_sel

Select certain type of atoms

repeat

The data will be repeated repeat times.

defaultfloat, default=0.

default value of data

dtypenp.dtype, optional

the dtype of data, overwrites high_prec if provided

output_natoms_for_type_selbool, optional

if True and type_sel is True, the atomic dimension will be natoms instead of nsel

reduce(key_out: str, key_in: str) DeepmdData[source]#

Generate a new item from the reduction of another atom.

Parameters:
key_out

The name of the reduced item

key_in

The name of the data item to be reduced

get_data_dict() dict[source]#

Get the data_dict.

check_batch_size(batch_size: int) bool[source]#

Check if the system can get a batch of data with batch_size frames.

check_test_size(test_size: int) bool[source]#

Check if the system can get a test dataset with test_size frames.

get_item_torch(index: int, num_worker: int = 1) dict[source]#

Get a single frame data . The frame is picked from the data system by index. The index is coded across all the sets.

Parameters:
index

index of the frame

num_worker

number of workers for parallel data modification

get_item_paddle(index: int, num_worker: int = 1) dict[source]#

Get a single frame data . The frame is picked from the data system by index. The index is coded across all the sets. Same with PyTorch backend.

Parameters:
index

index of the frame

num_worker

number of workers for parallel data modification

get_batch(batch_size: int) dict[source]#

Get a batch of data with batch_size frames. The frames are randomly picked from the data system.

Parameters:
batch_size

size of the batch

get_test(ntests: int = -1) dict[source]#

Get the test data with ntests frames.

Parameters:
ntests

Size of the test data set. If ntests is -1, all test data will be get.

get_ntypes() int[source]#

Number of atom types in the system.

get_type_map() list[str][source]#

Get the type map.

get_atom_type() list[int][source]#

Get atom types.

get_numb_set() int[source]#

Get number of training sets.

get_numb_batch(batch_size: int, set_idx: int) int[source]#

Get the number of batches in a set.

get_sys_numb_batch(batch_size: int) int[source]#

Get the number of batches in the data system.

get_natoms() int[source]#

Get number of atoms.

get_natoms_vec(ntypes: int) numpy.ndarray[source]#

Get number of atoms and number of atoms in different types.

Parameters:
ntypes

Number of types (may be larger than the actual number of types in the system).

Returns:
natoms

natoms[0]: number of local atoms natoms[1]: total number of atoms held by this processor natoms[i]: 2 <= i < Ntypes+2, number of type i atoms

get_single_frame(index: int, num_worker: int) dict[source]#

Orchestrates loading a single frame efficiently using memmap.

preload_and_modify_all_data_torch(num_worker: int) None[source]#

Preload all frames and apply modifier to cache them.

This method is useful when use_modifier_cache is True and you want to avoid applying the modifier repeatedly during training.

avg(key: str) float[source]#

Return the average value of an item.

_idx_map_sel(atom_type: numpy.ndarray, type_sel: list[int]) numpy.ndarray[source]#
_get_natoms_2(ntypes: int) tuple[int, numpy.ndarray][source]#
_get_memmap(path: deepmd.utils.path.DPPath) numpy.memmap[source]#

Get or create a memory-mapped object for a given npy file. Uses file path and modification time as cache keys to detect file changes and invalidate cache when files are modified.

_get_subdata(data: dict[str, Any], idx: numpy.ndarray | None = None) dict[str, Any][source]#
_load_batch_set(set_name: deepmd.utils.path.DPPath) None[source]#
reset_get_batch() None[source]#
_load_test_set(shuffle_test: bool) None[source]#
_shuffle_data(data: dict[str, Any]) dict[str, Any][source]#
_get_nframes(set_name: deepmd.utils.path.DPPath | str) int[source]#
reformat_data_torch(data: dict[str, Any]) dict[str, Any][source]#

Modify the data format for the requirements of Torch backend.

Parameters:
data

original data

_load_set(set_name: deepmd.utils.path.DPPath) dict[str, Any][source]#
_load_data(set_name: str, key: str, nframes: int, ndof_: int, atomic: bool = False, must: bool = True, repeat: int = 1, high_prec: bool = False, type_sel: list[int] | None = None, default: float = 0.0, dtype: numpy.dtype | None = None, output_natoms_for_type_sel: bool = False) numpy.ndarray[source]#
_load_single_data(set_dir: deepmd.utils.path.DPPath, key: str, frame_idx: int, set_nframes: int) tuple[numpy.float32, numpy.ndarray][source]#

Loads and processes data for a SINGLE frame from a SINGLE key, fully replicating the logic from the original _load_data method.

Parameters:
set_dirDPPath

The directory path of the set

keystr

The key name of the data to load

frame_idxint

The local frame index within the set

set_nframesint

The total number of frames in this set (to avoid redundant _get_nframes calls)

_load_type(sys_path: deepmd.utils.path.DPPath) numpy.ndarray[source]#
_load_type_mix(set_name: deepmd.utils.path.DPPath) numpy.ndarray[source]#
_make_idx_map(atom_type: numpy.ndarray) numpy.ndarray[source]#
_load_type_map(sys_path: deepmd.utils.path.DPPath) list[str] | None[source]#
_check_pbc(sys_path: deepmd.utils.path.DPPath) bool[source]#
_check_mode(set_path: deepmd.utils.path.DPPath) bool[source]#
static _create_memmap(path_str: str, mtime_str: str) numpy.memmap[source]#

A cached helper function to create memmap objects. Using lru_cache to limit the number of open file handles.

Parameters:
path_str

The file path as a string.

mtime_str

The modification time as a string, used for cache invalidation.

class deepmd.tf.utils.DeepmdDataSystem(systems: list[str], batch_size: int, test_size: int, rcut: float | None = None, set_prefix: str = 'set', shuffle_test: bool = True, type_map: list[str] | None = None, optional_type_map: bool = True, modifier: Any | None = None, trn_all_set: bool = False, sys_probs: list[float] | None = None, auto_prob_style: str = 'prob_sys_size', sort_atoms: bool = True)[source]#

Class for manipulating many data systems.

It is implemented with the help of DeepmdData

system_dirs#
nsystems#
data_systems = []#
batch_size#
mixed_systems = False#
sys_ntypes#
natoms = []#
natoms_vec = []#
nbatches = []#
type_map = []#
test_size#
pick_idx = 0#
sys_probs = None#
_load_test(ntests: int = -1) None[source]#
property default_mesh: list[numpy.ndarray]#

Mesh for each system.

compute_energy_shift(rcond: float | None = None, key: str = 'energy') tuple[numpy.ndarray, numpy.ndarray][source]#
add_dict(adict: dict[str, dict[str, Any]]) None[source]#

Add items to the data system by a dict. adict should have items like .. code-block:: python.

adict[key] = {

“ndof”: ndof, “atomic”: atomic, “must”: must, “high_prec”: high_prec, “type_sel”: type_sel, “repeat”: repeat,

}

For the explanation of the keys see add

add_data_requirements(data_requirements: list[deepmd.utils.data.DataRequirementItem]) None[source]#

Add items to the data system by a list of DataRequirementItem.

add(key: str, ndof: int, atomic: bool = False, must: bool = False, high_prec: bool = False, type_sel: list[int] | None = None, repeat: int = 1, default: float = 0.0, dtype: numpy.dtype | None = None, output_natoms_for_type_sel: bool = False) None[source]#

Add a data item that to be loaded.

Parameters:
key

The key of the item. The corresponding data is stored in sys_path/set.*/key.npy

ndof

The number of dof

atomic

The item is an atomic property. If False, the size of the data should be nframes x ndof If True, the size of data should be nframes x natoms x ndof

must

The data file sys_path/set.*/key.npy must exist. If must is False and the data file does not exist, the data_dict[find_key] is set to 0.0

high_prec

Load the data and store in float64, otherwise in float32

type_sel

Select certain type of atoms

repeat

The data will be repeated repeat times.

default, default=0.

Default value of data

dtype

The dtype of data, overwrites high_prec if provided

output_natoms_for_type_selbool

If True and type_sel is True, the atomic dimension will be natoms instead of nsel

reduce(key_out: str, key_in: str) None[source]#

Generate a new item from the reduction of another atom.

Parameters:
key_out

The name of the reduced item

key_in

The name of the data item to be reduced

get_data_dict(ii: int = 0) dict[source]#
set_sys_probs(sys_probs: list[float] | None = None, auto_prob_style: str = 'prob_sys_size') None[source]#
get_batch(sys_idx: int | None = None) dict[source]#

Get a batch of data from the data systems.

Parameters:
sys_idxint

The index of system from which the batch is get. If sys_idx is not None, sys_probs and auto_prob_style are ignored If sys_idx is None, automatically determine the system according to sys_probs or auto_prob_style, see the following. This option does not work for mixed systems.

Returns:
dict

The batch data

get_batch_standard(sys_idx: int | None = None) dict[source]#

Get a batch of data from the data systems in the standard way.

Parameters:
sys_idxint

The index of system from which the batch is get. If sys_idx is not None, sys_probs and auto_prob_style are ignored If sys_idx is None, automatically determine the system according to sys_probs or auto_prob_style, see the following.

Returns:
dict

The batch data

get_batch_mixed() dict[source]#

Get a batch of data from the data systems in the mixed way.

Returns:
dict

The batch data

_merge_batch_data(batch_data: list[dict]) dict[source]#

Merge batch data from different systems.

Parameters:
batch_datalist of dict

A list of batch data from different systems.

Returns:
dict

The merged batch data.

get_test(sys_idx: int | None = None, n_test: int = -1) dict[str, numpy.ndarray][source]#

Get test data from the the data systems.

Parameters:
sys_idx

The test dat of system with index sys_idx will be returned. If is None, the currently selected system will be returned.

n_test

Number of test data. If set to -1 all test data will be get.

get_sys_ntest(sys_idx: int | None = None) int[source]#

Get number of tests for the currently selected system, or one defined by sys_idx.

get_type_map() list[str][source]#

Get the type map.

get_nbatches() int[source]#

Get the total number of batches.

get_ntypes() int[source]#

Get the number of types.

get_nsystems() int[source]#

Get the number of data systems.

get_sys(idx: int) deepmd.utils.data.DeepmdData[source]#

Get a certain data system.

get_batch_size() int[source]#

Get the batch size.

print_summary(name: str) None[source]#
_make_auto_bs(rule: int) list[int][source]#
_make_auto_ts(percent: float) list[int][source]#
_check_type_map_consistency(type_map_list: list[list[str] | None]) list[str][source]#
class deepmd.tf.utils.LearningRateSchedule(params: dict[str, Any])[source]#

TensorFlow wrapper for BaseLR.

The learning rate is computed via tf.numpy_function(), which prevents TensorFlow from optimizing this operation in the graph. This overhead is typically negligible compared to forward/backward passes.

Parameters:
paramsdict[str, Any]

Learning rate configuration dictionary.

_params#
_base_lr: deepmd.dpmodel.utils.learning_rate.BaseLR | None = None#
start_lr() float[source]#

Get the starting learning rate.

Returns:
float

The starting learning rate.

property base_lr: deepmd.dpmodel.utils.learning_rate.BaseLR#

Get the built BaseLR instance.

Returns:
BaseLR

The built learning rate schedule.

Raises:
RuntimeError

If the schedule has not been built.

build(global_step: deepmd.tf.env.tf.Tensor, num_steps: int) deepmd.tf.env.tf.Tensor[source]#

Build a TensorFlow learning rate tensor.

Parameters:
global_steptf.Tensor

The global training step tensor.

num_stepsint

The total training steps.

Returns:
tf.Tensor

The learning rate tensor.

value(step: int) float[source]#

Get the learning rate at the given step.

Parameters:
stepint

The step index.

Returns:
float

The learning rate value.

Raises:
RuntimeError

If the schedule has not been built.

class deepmd.tf.utils.PairTab(filename: str, rcut: float | None = None)[source]#

Pairwise tabulated potential.

Parameters:
filename

File name for the short-range tabulated potential. The table is a text data file with (N_t + 1) * N_t / 2 + 1 columes. The first colume is the distance between atoms. The second to the last columes are energies for pairs of certain types. For example we have two atom types, 0 and 1. The columes from 2nd to 4th are for 0-0, 0-1 and 1-1 correspondingly.

rcutfloat, optional

cutoff raduis for the tabulated potential

data_type#
reinit(filename: str, rcut: float | None = None) None[source]#

Initialize the tabulated interaction.

Parameters:
filename

File name for the short-range tabulated potential. The table is a text data file with (N_t + 1) * N_t / 2 + 1 columes. The first colume is the distance between atoms. The second to the last columes are energies for pairs of certain types. For example we have two atom types, 0 and 1. The columes from 2nd to 4th are for 0-0, 0-1 and 1-1 correspondingly.

rcutfloat, optional

cutoff raduis for the tabulated potential

serialize() dict[source]#
classmethod deserialize(data: dict[str, Any]) PairTab[source]#
_check_table_upper_boundary() None[source]#

Update User Provided Table Based on rcut.

This function checks the upper boundary provided in the table against rcut. If the table upper boundary values decay to zero before rcut, padding zeros will be added to the table to cover rcut; if the table upper boundary values do not decay to zero before ruct, extrapolation will be performed till rcut.

Examples

table = [[0.005 1. 2. 3. ]

[0.01 0.8 1.6 2.4 ] [0.015 0. 1. 1.5 ]]

rcut = 0.022

new_table = [[0.005 1. 2. 3. ]

[0.01 0.8 1.6 2.4 ] [0.015 0. 1. 1.5 ] [0.02 0. 0. 0. ]


table = [[0.005 1. 2. 3. ]

[0.01 0.8 1.6 2.4 ] [0.015 0.5 1. 1.5 ] [0.02 0.25 0.4 0.75 ] [0.025 0. 0.1 0. ] [0.03 0. 0. 0. ]]

rcut = 0.031

new_table = [[0.005 1. 2. 3. ]

[0.01 0.8 1.6 2.4 ] [0.015 0.5 1. 1.5 ] [0.02 0.25 0.4 0.75 ] [0.025 0. 0.1 0. ] [0.03 0. 0. 0. ] [0.035 0. 0. 0. ]]

get() tuple[numpy.array, numpy.array][source]#

Get the serialized table.

_extrapolate_table(pad_extrapolation: numpy.array) numpy.array[source]#

Soomth extrapolation between table upper boundary and rcut.

This method should only be used when the table upper boundary rmax is smaller than rcut, and the table upper boundary values are not zeros. To simplify the problem, we use a single cubic spline between rmax and rcut for each pair of atom types. One can substitute this extrapolation to higher order polynomials if needed.

There are two scenarios:
  1. ruct - rmax >= hh:

    Set values at the grid point right before rcut to 0, and perform exterapolation between the grid point and rmax, this allows smooth decay to 0 at rcut.

  2. rcut - rmax < hh:

    Set values at rmax + hh to 0, and perform extrapolation between rmax and rmax + hh.

Parameters:
pad_extrapolationnp.array

The emepty grid that holds the extrapolation values.

Returns:
np.array

The cubic spline extrapolation.

_make_data() numpy.ndarray[source]#
class deepmd.tf.utils.Plugin[source]#

A class to register and restore plugins.

Attributes:
pluginsdict[str, object]

plugins

Examples

>>> plugin = Plugin()
>>> @plugin.register("xx")
    def xxx():
        pass
>>> print(plugin.plugins["xx"])
plugins#
__add__(other: Plugin) Plugin[source]#
register(key: str) collections.abc.Callable[[object], object][source]#

Register a plugin.

Parameters:
keystr

key of the plugin

Returns:
Callable[[object], object]

decorator

get_plugin(key: str) object[source]#

Visit a plugin by key.

Parameters:
keystr

key of the plugin

Returns:
object

the plugin

class deepmd.tf.utils.PluginVariant[source]#

A class to remove type from input arguments.