Source code for deepmd.utils.data

#!/usr/bin/env python3

import time
import glob
import numpy as np
import os.path
from typing import Tuple, List
import logging

from deepmd.env import GLOBAL_NP_FLOAT_PRECISION
from deepmd.env import GLOBAL_ENER_FLOAT_PRECISION
from deepmd.utils import random as dp_random
from deepmd.utils.path import DPPath

log = logging.getLogger(__name__)

[docs]class DeepmdData() : """ Class for a data system. It loads data from hard disk, and mantains the data as a `data_dict` Parameters ---------- sys_path Path to the data system set_prefix Prefix for the directories of different sets shuffle_test If the test data are shuffled type_map Gives the name of different atom types modifier Data modifier that has the method `modify_data` trn_all_set Use all sets as training dataset. Otherwise, if the number of sets is more than 1, the last set is left for test. """ def __init__ (self, sys_path : str, set_prefix : str = 'set', shuffle_test : bool = True, type_map : List[str] = None, modifier = None, trn_all_set : bool = False) : """ Constructor """ root = DPPath(sys_path) self.dirs = root.glob(set_prefix + ".*") self.dirs.sort() # load atom type self.atom_type = self._load_type(root) self.natoms = len(self.atom_type) # load atom type map self.type_map = self._load_type_map(root) if self.type_map is not None: assert(len(self.type_map) >= max(self.atom_type)+1) # check pbc self.pbc = self._check_pbc(root) # enforce type_map if necessary if type_map is not None and self.type_map is not None: atom_type_ = [type_map.index(self.type_map[ii]) for ii in self.atom_type] self.atom_type = np.array(atom_type_, dtype = np.int32) self.type_map = type_map # make idx map self.idx_map = self._make_idx_map(self.atom_type) # train dirs self.test_dir = self.dirs[-1] if trn_all_set: self.train_dirs = self.dirs else: if len(self.dirs) == 1 : self.train_dirs = self.dirs else : self.train_dirs = self.dirs[:-1] self.data_dict = {} # add box and coord self.add('box', 9, must = self.pbc) self.add('coord', 3, atomic = True, must = True) # set counters self.set_count = 0 self.iterator = 0 self.shuffle_test = shuffle_test # set modifier self.modifier = modifier
[docs] def add(self, key : str, ndof : int, atomic : bool = False, must : bool = False, high_prec : bool = False, type_sel : List[int] = None, repeat : int = 1 ) : """ Add a data item that to be loaded Parameters ---------- key The key of the item. The corresponding data is stored in `sys_path/set.*/key.npy` ndof The number of dof atomic The item is an atomic property. If False, the size of the data should be nframes x ndof If True, the size of data should be nframes x natoms x ndof must The data file `sys_path/set.*/key.npy` must exist. If must is False and the data file does not exist, the `data_dict[find_key]` is set to 0.0 high_prec Load the data and store in float64, otherwise in float32 type_sel Select certain type of atoms repeat The data will be repeated `repeat` times. """ self.data_dict[key] = {'ndof': ndof, 'atomic': atomic, 'must': must, 'high_prec': high_prec, 'type_sel': type_sel, 'repeat': repeat, 'reduce': None, } return self
[docs] def reduce(self, key_out : str, key_in : str ) : """ Generate a new item from the reduction of another atom Parameters ---------- key_out The name of the reduced item key_in The name of the data item to be reduced """ assert (key_in in self.data_dict), 'cannot find input key' assert (self.data_dict[key_in]['atomic']), 'reduced property should be atomic' assert (not(key_out in self.data_dict)), 'output key should not have been added' assert (self.data_dict[key_in]['repeat'] == 1), 'reduced proerties should not have been repeated' self.data_dict[key_out] = {'ndof': self.data_dict[key_in]['ndof'], 'atomic': False, 'must': True, 'high_prec': True, 'type_sel': None, 'repeat': 1, 'reduce': key_in, } return self
[docs] def get_data_dict(self) -> dict: """ Get the `data_dict` """ return self.data_dict
[docs] def check_batch_size (self, batch_size) : """ Check if the system can get a batch of data with `batch_size` frames. """ for ii in self.train_dirs : if self.data_dict['coord']['high_prec'] : tmpe = (ii / "coord.npy").load_numpy().astype(GLOBAL_ENER_FLOAT_PRECISION) else: tmpe = (ii / "coord.npy").load_numpy().astype(GLOBAL_NP_FLOAT_PRECISION) if tmpe.ndim == 1: tmpe = tmpe.reshape([1,-1]) if tmpe.shape[0] < batch_size : return ii, tmpe.shape[0] return None
[docs] def check_test_size (self, test_size) : """ Check if the system can get a test dataset with `test_size` frames. """ if self.data_dict['coord']['high_prec'] : tmpe = (self.test_dir / "coord.npy").load_numpy().astype(GLOBAL_ENER_FLOAT_PRECISION) else: tmpe = (self.test_dir / "coord.npy").load_numpy().astype(GLOBAL_NP_FLOAT_PRECISION) if tmpe.ndim == 1: tmpe = tmpe.reshape([1,-1]) if tmpe.shape[0] < test_size : return self.test_dir, tmpe.shape[0] else : return None
[docs] def get_batch(self, batch_size : int ) -> dict : """ Get a batch of data with `batch_size` frames. The frames are randomly picked from the data system. Parameters ---------- batch_size size of the batch """ if hasattr(self, 'batch_set') : set_size = self.batch_set["coord"].shape[0] else : set_size = 0 if self.iterator + batch_size > set_size : self._load_batch_set (self.train_dirs[self.set_count % self.get_numb_set()]) self.set_count += 1 set_size = self.batch_set["coord"].shape[0] if self.modifier is not None: self.modifier.modify_data(self.batch_set) iterator_1 = self.iterator + batch_size if iterator_1 >= set_size : iterator_1 = set_size idx = np.arange (self.iterator, iterator_1) self.iterator += batch_size ret = self._get_subdata(self.batch_set, idx) return ret
[docs] def get_test (self, ntests : int = -1 ) -> dict: """ Get the test data with `ntests` frames. Parameters ---------- ntests Size of the test data set. If `ntests` is -1, all test data will be get. """ if not hasattr(self, 'test_set') : self._load_test_set(self.test_dir, self.shuffle_test) if ntests == -1: idx = None else : ntests_ = ntests if ntests < self.test_set['type'].shape[0] else self.test_set['type'].shape[0] # print('ntest', self.test_set['type'].shape[0], ntests, ntests_) idx = np.arange(ntests_) ret = self._get_subdata(self.test_set, idx = idx) if self.modifier is not None: self.modifier.modify_data(ret) return ret
[docs] def get_ntypes(self) -> int: """ Number of atom types in the system """ if self.type_map is not None: return len(self.type_map) else: return max(self.get_atom_type()) + 1
[docs] def get_type_map(self) -> List[str]: """ Get the type map """ return self.type_map
[docs] def get_atom_type(self) -> List[int]: """ Get atom types """ return self.atom_type
[docs] def get_numb_set (self) -> int: """ Get number of training sets """ return len (self.train_dirs)
[docs] def get_numb_batch (self, batch_size : int, set_idx : int ) -> int: """ Get the number of batches in a set. """ data = self._load_set(self.train_dirs[set_idx]) ret = data["coord"].shape[0] // batch_size if ret == 0: ret = 1 return ret
[docs] def get_sys_numb_batch (self, batch_size : int ) -> int: """ Get the number of batches in the data system. """ ret = 0 for ii in range(len(self.train_dirs)) : ret += self.get_numb_batch(batch_size, ii) return ret
[docs] def get_natoms (self) : """ Get number of atoms """ return len(self.atom_type)
[docs] def get_natoms_vec (self, ntypes : int) : """ Get number of atoms and number of atoms in different types Parameters ---------- ntypes Number of types (may be larger than the actual number of types in the system). Returns ------- natoms natoms[0]: number of local atoms natoms[1]: total number of atoms held by this processor natoms[i]: 2 <= i < Ntypes+2, number of type i atoms """ natoms, natoms_vec = self._get_natoms_2 (ntypes) tmp = [natoms, natoms] tmp = np.append (tmp, natoms_vec) return tmp.astype(np.int32)
[docs] def avg(self, key) : """ Return the average value of an item. """ if key not in self.data_dict.keys() : raise RuntimeError('key %s has not been added' % key) info = self.data_dict[key] ndof = info['ndof'] eners = np.array([]) for ii in self.train_dirs: data = self._load_set(ii) ei = data[key].reshape([-1, ndof]) if eners.size == 0 : eners = ei else : eners = np.concatenate((eners, ei), axis = 0) if eners.size == 0 : return 0 else : return np.average(eners, axis = 0)
def _idx_map_sel(self, atom_type, type_sel) : new_types = [] for ii in atom_type : if ii in type_sel: new_types.append(ii) new_types = np.array(new_types, dtype = int) natoms = new_types.shape[0] idx = np.arange(natoms) idx_map = np.lexsort((idx, new_types)) return idx_map def _get_natoms_2 (self, ntypes) : sample_type = self.atom_type natoms = len(sample_type) natoms_vec = np.zeros (ntypes).astype(int) for ii in range (ntypes) : natoms_vec[ii] = np.count_nonzero(sample_type == ii) return natoms, natoms_vec def _get_subdata(self, data, idx = None) : new_data = {} for ii in data: dd = data[ii] if 'find_' in ii: new_data[ii] = dd else: if idx is not None: new_data[ii] = dd[idx] else : new_data[ii] = dd return new_data def _load_batch_set (self, set_name: DPPath) : self.batch_set = self._load_set(set_name) self.batch_set, _ = self._shuffle_data(self.batch_set) self.reset_get_batch()
[docs] def reset_get_batch(self): self.iterator = 0
def _load_test_set (self, set_name: DPPath, shuffle_test) : self.test_set = self._load_set(set_name) if shuffle_test : self.test_set, _ = self._shuffle_data(self.test_set) def _shuffle_data (self, data) : ret = {} nframes = data['coord'].shape[0] idx = np.arange (nframes) dp_random.shuffle(idx) for kk in data : if type(data[kk]) == np.ndarray and \ len(data[kk].shape) == 2 and \ data[kk].shape[0] == nframes and \ not('find_' in kk) and \ 'type' != kk: ret[kk] = data[kk][idx] else : ret[kk] = data[kk] return ret, idx def _load_set(self, set_name: DPPath) : # get nframes if not isinstance(set_name, DPPath): set_name = DPPath(set_name) path = set_name / "coord.npy" if self.data_dict['coord']['high_prec'] : coord = path.load_numpy().astype(GLOBAL_ENER_FLOAT_PRECISION) else: coord = path.load_numpy().astype(GLOBAL_NP_FLOAT_PRECISION) if coord.ndim == 1: coord = coord.reshape([1,-1]) nframes = coord.shape[0] assert(coord.shape[1] == self.data_dict['coord']['ndof'] * self.natoms) # load keys data = {} data['type'] = np.tile (self.atom_type[self.idx_map], (nframes, 1)) for kk in self.data_dict.keys(): if self.data_dict[kk]['reduce'] is None : data['find_'+kk], data[kk] \ = self._load_data(set_name, kk, nframes, self.data_dict[kk]['ndof'], atomic = self.data_dict[kk]['atomic'], high_prec = self.data_dict[kk]['high_prec'], must = self.data_dict[kk]['must'], type_sel = self.data_dict[kk]['type_sel'], repeat = self.data_dict[kk]['repeat']) for kk in self.data_dict.keys(): if self.data_dict[kk]['reduce'] is not None : k_in = self.data_dict[kk]['reduce'] ndof = self.data_dict[kk]['ndof'] data['find_'+kk] = data['find_'+k_in] tmp_in = data[k_in].astype(GLOBAL_ENER_FLOAT_PRECISION) data[kk] = np.sum(np.reshape(tmp_in, [nframes, self.natoms, ndof]), axis = 1) return data def _load_data(self, set_name, key, nframes, ndof_, atomic = False, must = True, repeat = 1, high_prec = False, type_sel = None): if atomic: natoms = self.natoms idx_map = self.idx_map # if type_sel, then revise natoms and idx_map if type_sel is not None: natoms = 0 for jj in type_sel : natoms += np.sum(self.atom_type == jj) idx_map = self._idx_map_sel(self.atom_type, type_sel) ndof = ndof_ * natoms else: ndof = ndof_ path = set_name / (key+".npy") if path.is_file() : if high_prec : data = path.load_numpy().astype(GLOBAL_ENER_FLOAT_PRECISION) else: data = path.load_numpy().astype(GLOBAL_NP_FLOAT_PRECISION) try: # YWolfeee: deal with data shape error if atomic : data = data.reshape([nframes, natoms, -1]) data = data[:,idx_map,:] data = data.reshape([nframes, -1]) data = np.reshape(data, [nframes, ndof]) except ValueError as err_message: explanation = "This error may occur when your label mismatch it's name, i.e. you might store global tensor in `atomic_tensor.npy` or atomic tensor in `tensor.npy`." log.error(str(err_message)) log.error(explanation) raise ValueError(str(err_message) + ". " + explanation) if repeat != 1: data = np.repeat(data, repeat).reshape([nframes, -1]) return np.float32(1.0), data elif must: raise RuntimeError("%s not found!" % path) else: if high_prec : data = np.zeros([nframes,ndof]).astype(GLOBAL_ENER_FLOAT_PRECISION) else : data = np.zeros([nframes,ndof]).astype(GLOBAL_NP_FLOAT_PRECISION) if repeat != 1: data = np.repeat(data, repeat).reshape([nframes, -1]) return np.float32(0.0), data def _load_type (self, sys_path: DPPath) : atom_type = (sys_path / "type.raw").load_txt(dtype=np.int32, ndmin=1) return atom_type def _make_idx_map(self, atom_type): natoms = atom_type.shape[0] idx = np.arange (natoms) idx_map = np.lexsort ((idx, atom_type)) return idx_map def _load_type_map(self, sys_path: DPPath) : fname = sys_path / 'type_map.raw' if fname.is_file() : return fname.load_txt(dtype=str).tolist() else : return None def _check_pbc(self, sys_path: DPPath): pbc = True if (sys_path / 'nopbc').is_file() : pbc = False return pbc
[docs]class DataSets (object): """ Outdated class for one data system. .. deprecated:: 2.0.0 This class is not maintained any more. """ def __init__ (self, sys_path, set_prefix, seed = None, shuffle_test = True) : self.dirs = glob.glob (os.path.join(sys_path, set_prefix + ".*")) self.dirs.sort() # load atom type self.atom_type, self.idx_map, self.idx3_map = self.load_type (sys_path) # load atom type map self.type_map = self.load_type_map(sys_path) if self.type_map is not None: assert(len(self.type_map) >= max(self.atom_type)+1) # train dirs self.test_dir = self.dirs[-1] if len(self.dirs) == 1 : self.train_dirs = self.dirs else : self.train_dirs = self.dirs[:-1] # check fparam has_fparam = [ os.path.isfile(os.path.join(ii, 'fparam.npy')) for ii in self.dirs ] if any(has_fparam) and (not all(has_fparam)) : raise RuntimeError("system %s: if any set has frame parameter, then all sets should have frame parameter" % sys_path) if all(has_fparam) : self.has_fparam = 0 else : self.has_fparam = -1 # check aparam has_aparam = [ os.path.isfile(os.path.join(ii, 'aparam.npy')) for ii in self.dirs ] if any(has_aparam) and (not all(has_aparam)) : raise RuntimeError("system %s: if any set has frame parameter, then all sets should have frame parameter" % sys_path) if all(has_aparam) : self.has_aparam = 0 else : self.has_aparam = -1 # energy norm self.eavg = self.stats_energy() # load sets self.set_count = 0 self.load_batch_set (self.train_dirs[self.set_count % self.get_numb_set()]) self.load_test_set (self.test_dir, shuffle_test)
[docs] def check_batch_size (self, batch_size) : for ii in self.train_dirs : tmpe = np.load(os.path.join(ii, "coord.npy")) if tmpe.shape[0] < batch_size : return ii, tmpe.shape[0] return None
[docs] def check_test_size (self, test_size) : tmpe = np.load(os.path.join(self.test_dir, "coord.npy")) if tmpe.shape[0] < test_size : return self.test_dir, tmpe.shape[0] else : return None
[docs] def load_type (self, sys_path) : atom_type = np.loadtxt (os.path.join(sys_path, "type.raw"), dtype=np.int32, ndmin=1) natoms = atom_type.shape[0] idx = np.arange (natoms) idx_map = np.lexsort ((idx, atom_type)) atom_type3 = np.repeat(atom_type, 3) idx3 = np.arange (natoms * 3) idx3_map = np.lexsort ((idx3, atom_type3)) return atom_type, idx_map, idx3_map
[docs] def load_type_map(self, sys_path) : fname = os.path.join(sys_path, 'type_map.raw') if os.path.isfile(fname) : with open(os.path.join(sys_path, 'type_map.raw')) as fp: return fp.read().split() else : return None
[docs] def get_type_map(self) : return self.type_map
[docs] def get_numb_set (self) : return len (self.train_dirs)
[docs] def stats_energy (self) : eners = np.array([]) for ii in self.train_dirs: ener_file = os.path.join(ii, "energy.npy") if os.path.isfile(ener_file) : ei = np.load(ener_file) eners = np.append(eners, ei) if eners.size == 0 : return 0 else : return np.average(eners)
[docs] def load_energy(self, set_name, nframes, nvalues, energy_file, atom_energy_file) : """ return : coeff_ener, ener, coeff_atom_ener, atom_ener """ # load atom_energy coeff_atom_ener, atom_ener = self.load_data(set_name, atom_energy_file, [nframes, nvalues], False) # ignore energy_file if coeff_atom_ener == 1: ener = np.sum(atom_ener, axis = 1) coeff_ener = 1 # load energy_file else: coeff_ener, ener = self.load_data(set_name, energy_file, [nframes], False) return coeff_ener, ener, coeff_atom_ener, atom_ener
[docs] def load_data(self, set_name, data_name, shape, is_necessary = True): path = os.path.join(set_name, data_name+".npy") if os.path.isfile (path) : data = np.load(path) data = np.reshape(data, shape) if is_necessary: return data return 1, data elif is_necessary: raise OSError("%s not found!" % path) else: data = np.zeros(shape) return 0, data
[docs] def load_set(self, set_name, shuffle = True): data = {} data["box"] = self.load_data(set_name, "box", [-1, 9]) nframe = data["box"].shape[0] data["coord"] = self.load_data(set_name, "coord", [nframe, -1]) ncoord = data["coord"].shape[1] if self.has_fparam >= 0: data["fparam"] = self.load_data(set_name, "fparam", [nframe, -1]) if self.has_fparam == 0 : self.has_fparam = data["fparam"].shape[1] else : assert self.has_fparam == data["fparam"].shape[1] if self.has_aparam >= 0: data["aparam"] = self.load_data(set_name, "aparam", [nframe, -1]) if self.has_aparam == 0 : self.has_aparam = data["aparam"].shape[1] // (ncoord//3) else : assert self.has_aparam == data["aparam"].shape[1] // (ncoord//3) data["prop_c"] = np.zeros(5) data["prop_c"][0], data["energy"], data["prop_c"][3], data["atom_ener"] \ = self.load_energy (set_name, nframe, ncoord // 3, "energy", "atom_ener") data["prop_c"][1], data["force"] = self.load_data(set_name, "force", [nframe, ncoord], False) data["prop_c"][2], data["virial"] = self.load_data(set_name, "virial", [nframe, 9], False) data["prop_c"][4], data["atom_pref"] = self.load_data(set_name, "atom_pref", [nframe, ncoord//3], False) data["atom_pref"] = np.repeat(data["atom_pref"], 3, axis=1) # shuffle data if shuffle: idx = np.arange (nframe) dp_random.shuffle(idx) for ii in data: if ii != "prop_c": data[ii] = data[ii][idx] data["type"] = np.tile (self.atom_type, (nframe, 1)) # sort according to type for ii in ["type", "atom_ener"]: data[ii] = data[ii][:, self.idx_map] for ii in ["coord", "force", "atom_pref"]: data[ii] = data[ii][:, self.idx3_map] return data
[docs] def load_batch_set (self, set_name) : self.batch_set = self.load_set(set_name, True) self.reset_iter ()
[docs] def load_test_set (self, set_name, shuffle_test) : self.test_set = self.load_set(set_name, shuffle_test)
[docs] def reset_iter (self) : self.iterator = 0 self.set_count += 1
[docs] def get_set(self, data, idx = None) : new_data = {} for ii in data: dd = data[ii] if ii == "prop_c": new_data[ii] = dd.astype(np.float32) else: if idx is not None: dd = dd[idx] if ii == "type": new_data[ii] = dd else: new_data[ii] = dd.astype(GLOBAL_NP_FLOAT_PRECISION) return new_data
[docs] def get_test (self) : """ returned property prefector [4] in order: energy, force, virial, atom_ener """ return self.get_set(self.test_set)
[docs] def get_batch (self, batch_size) : """ returned property prefector [4] in order: energy, force, virial, atom_ener """ set_size = self.batch_set["energy"].shape[0] # assert (batch_size <= set_size), "batch size should be no more than set size" if self.iterator + batch_size > set_size : self.load_batch_set (self.train_dirs[self.set_count % self.get_numb_set()]) set_size = self.batch_set["energy"].shape[0] # print ("%d %d %d" % (self.iterator, self.iterator + batch_size, set_size)) iterator_1 = self.iterator + batch_size if iterator_1 >= set_size : iterator_1 = set_size idx = np.arange (self.iterator, iterator_1) self.iterator += batch_size return self.get_set(self.batch_set, idx)
[docs] def get_natoms (self) : sample_type = self.batch_set["type"][0] natoms = len(sample_type) return natoms
[docs] def get_natoms_2 (self, ntypes) : sample_type = self.batch_set["type"][0] natoms = len(sample_type) natoms_vec = np.zeros (ntypes).astype(int) for ii in range (ntypes) : natoms_vec[ii] = np.count_nonzero(sample_type == ii) return natoms, natoms_vec
[docs] def get_natoms_vec (self, ntypes) : natoms, natoms_vec = self.get_natoms_2 (ntypes) tmp = [natoms, natoms] tmp = np.append (tmp, natoms_vec) return tmp.astype(np.int32)
[docs] def set_numb_batch (self, batch_size) : return self.batch_set["energy"].shape[0] // batch_size
[docs] def get_sys_numb_batch (self, batch_size) : return self.set_numb_batch(batch_size) * self.get_numb_set()
[docs] def get_ener (self) : return self.eavg
[docs] def numb_fparam(self) : return self.has_fparam
[docs] def numb_aparam(self) : return self.has_aparam