Source code for dpdata.plugins.deepmd
from __future__ import annotations
import os
try:
import h5py
except ImportError:
pass
import numpy as np
import dpdata
import dpdata.deepmd.comp
import dpdata.deepmd.hdf5
import dpdata.deepmd.mixed
import dpdata.deepmd.raw
from dpdata.driver import Driver
from dpdata.format import Format
[docs]
@Format.register("deepmd")
@Format.register("deepmd/raw")
class DeePMDRawFormat(Format):
[docs]
def from_system(self, file_name, type_map=None, **kwargs):
return dpdata.deepmd.raw.to_system_data(
file_name, type_map=type_map, labels=False
)
[docs]
def to_system(self, data, file_name, **kwargs):
"""Dump the system in deepmd raw format to directory `file_name`."""
dpdata.deepmd.raw.dump(file_name, data)
[docs]
def from_labeled_system(self, file_name, type_map=None, **kwargs):
return dpdata.deepmd.raw.to_system_data(
file_name, type_map=type_map, labels=True
)
MultiMode = Format.MultiModes.Directory
[docs]
@Format.register("deepmd/npy")
@Format.register("deepmd/comp")
class DeePMDCompFormat(Format):
[docs]
def from_system(self, file_name, type_map=None, **kwargs):
return dpdata.deepmd.comp.to_system_data(
file_name, type_map=type_map, labels=False
)
[docs]
def to_system(self, data, file_name, set_size=5000, prec=np.float64, **kwargs):
"""Dump the system in deepmd compressed format (numpy binary) to `folder`.
The frames are firstly split to sets, then dumped to seperated subfolders named as `folder/set.000`, `folder/set.001`, ....
Each set contains `set_size` frames.
The last set may have less frames than `set_size`.
Parameters
----------
data : dict
System data
file_name : str
The output folder
set_size : int
The size of each set.
prec : {numpy.float32, numpy.float64}
The floating point precision of the compressed data
**kwargs : dict
other parameters
"""
dpdata.deepmd.comp.dump(file_name, data, set_size=set_size, comp_prec=prec)
[docs]
def from_labeled_system(self, file_name, type_map=None, **kwargs):
return dpdata.deepmd.comp.to_system_data(
file_name, type_map=type_map, labels=True
)
MultiMode = Format.MultiModes.Directory
[docs]
@Format.register("deepmd/npy/mixed")
class DeePMDMixedFormat(Format):
"""Mixed type numpy format for DeePMD-kit.
Under this format, systems with the same number of atoms but different formula can be put together
for a larger system, especially when the frame numbers in systems are sparse.
This also helps to mixture the type information together for model training with type embedding network.
Examples
--------
Dump a MultiSystems into a mixed type numpy directory:
>>> import dpdata
>>> dpdata.MultiSystems(*systems).to_deepmd_npy_mixed("mixed_dir")
Load a mixed type data into a MultiSystems:
>>> import dpdata
>>> dpdata.MultiSystems().load_systems_from_file("mixed_dir", fmt="deepmd/npy/mixed")
"""
[docs]
def from_system_mix(self, file_name, type_map=None, **kwargs):
return dpdata.deepmd.mixed.to_system_data(
file_name, type_map=type_map, labels=False
)
[docs]
def to_system(
self, data, file_name, set_size: int = 2000, prec=np.float64, **kwargs
):
"""Dump the system in deepmd mixed type format (numpy binary) to `folder`.
The frames were already split to different systems, so these frames can be dumped to one single subfolders
named as `folder/set.000`, containing less than `set_size` frames.
Parameters
----------
data : dict
System data
file_name : str
The output folder
set_size : int, default=2000
set size
prec : {numpy.float32, numpy.float64}
The floating point precision of the compressed data
**kwargs : dict
other parameters
"""
dpdata.deepmd.mixed.dump(file_name, data, set_size=set_size, comp_prec=prec)
[docs]
def from_labeled_system_mix(self, file_name, type_map=None, **kwargs):
return dpdata.deepmd.mixed.to_system_data(
file_name, type_map=type_map, labels=True
)
[docs]
def mix_system(self, *system, type_map, **kwargs):
"""Mix the systems into mixed_type ones according to the unified given type_map.
Parameters
----------
*system : System
The systems to mix
type_map : list of str
Maps atom type to name
**kwargs : dict
other parameters
Returns
-------
mixed_systems: dict
dict of mixed system with key 'atom_numbs'
"""
return dpdata.deepmd.mixed.mix_system(*system, type_map=type_map, **kwargs)
[docs]
def from_multi_systems(self, directory, **kwargs):
sys_dir = []
for root, dirs, files in os.walk(directory):
if (
"type_map.raw" in files
): # mixed_type format systems must have type_map.raw
sys_dir.append(root)
return sys_dir
MultiMode = Format.MultiModes.Directory
[docs]
@Format.register("deepmd/hdf5")
class DeePMDHDF5Format(Format):
"""HDF5 format for DeePMD-kit.
Examples
--------
Dump a MultiSystems to a HDF5 file:
>>> import dpdata
>>> dpdata.MultiSystems().from_deepmd_npy("data").to_deepmd_hdf5("data.hdf5")
"""
def _from_system(
self,
file_name: str | (h5py.Group | h5py.File),
type_map: list[str],
labels: bool,
):
"""Convert HDF5 file to System or LabeledSystem data.
This method is used to switch from labeled or non-labeled options.
Parameters
----------
file_name : str or h5py.Group or h5py.File
file name of the HDF5 file or HDF5 object. If it is a string,
hashtag is used to split path to the HDF5 file and the HDF5 group
type_map : dict[str]
type map
labels : bool
if Labeled
Returns
-------
dict
System or LabeledSystem data
Raises
------
TypeError
file_name is not str or h5py.Group or h5py.File
"""
if isinstance(file_name, (h5py.Group, h5py.File)):
return dpdata.deepmd.hdf5.to_system_data(
file_name, "", type_map=type_map, labels=labels
)
elif isinstance(file_name, str):
s = file_name.split("#")
name = s[1] if len(s) > 1 else ""
with h5py.File(s[0], "r") as f:
return dpdata.deepmd.hdf5.to_system_data(
f, name, type_map=type_map, labels=labels
)
else:
raise TypeError("Unsupported file_name")
[docs]
def from_system(
self,
file_name: str | (h5py.Group | h5py.File),
type_map: list[str] | None = None,
**kwargs,
) -> dict:
"""Convert HDF5 file to System data.
Parameters
----------
file_name : str or h5py.Group or h5py.File
file name of the HDF5 file or HDF5 object. If it is a string,
hashtag is used to split path to the HDF5 file and the HDF5 group
type_map : dict[str]
type map
**kwargs : dict
other parameters
Returns
-------
dict
System data
Raises
------
TypeError
file_name is not str or h5py.Group or h5py.File
"""
return self._from_system(file_name, type_map=type_map, labels=False)
[docs]
def from_labeled_system(
self,
file_name: str | (h5py.Group | h5py.File),
type_map: list[str] | None = None,
**kwargs,
) -> dict:
"""Convert HDF5 file to LabeledSystem data.
Parameters
----------
file_name : str or h5py.Group or h5py.File
file name of the HDF5 file or HDF5 object. If it is a string,
hashtag is used to split path to the HDF5 file and the HDF5 group
type_map : dict[str]
type map
**kwargs : dict
other parameters
Returns
-------
dict
LabeledSystem data
Raises
------
TypeError
file_name is not str or h5py.Group or h5py.File
"""
return self._from_system(file_name, type_map=type_map, labels=True)
[docs]
def to_system(
self,
data: dict,
file_name: str | (h5py.Group | h5py.File),
set_size: int = 5000,
comp_prec: np.dtype = np.float64,
**kwargs,
):
"""Convert System data to HDF5 file.
Parameters
----------
data : dict
data dict
file_name : str or h5py.Group or h5py.File
file name of the HDF5 file or HDF5 object. If it is a string,
hashtag is used to split path to the HDF5 file and the HDF5 group
set_size : int, default=5000
set size
comp_prec : np.dtype
data precision
**kwargs : dict
other parameters
"""
if isinstance(file_name, (h5py.Group, h5py.File)):
dpdata.deepmd.hdf5.dump(
file_name, "", data, set_size=set_size, comp_prec=comp_prec
)
elif isinstance(file_name, str):
s = file_name.split("#")
name = s[1] if len(s) > 1 else ""
with h5py.File(s[0], "w") as f:
dpdata.deepmd.hdf5.dump(
f, name, data, set_size=set_size, comp_prec=comp_prec
)
else:
raise TypeError("Unsupported file_name")
[docs]
def from_multi_systems(self, directory: str, **kwargs) -> h5py.Group:
"""Generate HDF5 groups from a HDF5 file, which will be
passed to `from_system`.
Parameters
----------
directory : str
HDF5 file name
**kwargs : dict
other parameters
Yields
------
h5py.Group
a HDF5 group in the HDF5 file
"""
with h5py.File(directory, "r") as f:
for ff in f.keys():
yield f[ff]
[docs]
def to_multi_systems(
self, formulas: list[str], directory: str, **kwargs
) -> h5py.Group:
"""Generate HDF5 groups, which will be passed to `to_system`.
Parameters
----------
formulas : list[str]
formulas of MultiSystems
directory : str
HDF5 file name
**kwargs : dict
other parameters
Yields
------
h5py.Group
a HDF5 group with the name of formula
"""
with h5py.File(directory, "w") as f:
for ff in formulas:
yield f.create_group(ff)
[docs]
@Driver.register("dp")
@Driver.register("deepmd")
@Driver.register("deepmd-kit")
class DPDriver(Driver):
"""DeePMD-kit driver.
Parameters
----------
dp : deepmd.DeepPot or str
The deepmd-kit potential class or the filename of the model.
Examples
--------
>>> DPDriver("frozen_model.pb")
"""
def __init__(self, dp: str) -> None:
try:
# DP 1.x
import deepmd.DeepPot as DeepPot
except ModuleNotFoundError:
# DP 2.x
from deepmd.infer import DeepPot
if not isinstance(dp, DeepPot):
self.dp = DeepPot(dp)
else:
self.dp = dp
self.enable_auto_batch_size = (
"auto_batch_size" in DeepPot.__init__.__code__.co_varnames
)
[docs]
def label(self, data: dict) -> dict:
"""Label a system data by deepmd-kit. Returns new data with energy, forces, and virials.
Parameters
----------
data : dict
data with coordinates and atom types
Returns
-------
dict
labeled data with energies and forces
"""
type_map = self.dp.get_type_map()
ori_sys = dpdata.System.from_dict({"data": data})
ori_sys_copy = ori_sys.copy()
ori_sys.sort_atom_names(type_map=type_map)
atype = ori_sys["atom_types"]
ori_sys = ori_sys_copy
if not self.enable_auto_batch_size:
labeled_sys = dpdata.LabeledSystem()
for ss in ori_sys:
coord = ss["coords"].reshape((1, ss.get_natoms() * 3))
if not ss.nopbc:
cell = ss["cells"].reshape((1, 9))
else:
cell = None
e, f, v = self.dp.eval(coord, cell, atype)
data = ss.data
data["energies"] = e.reshape((1,))
data["forces"] = f.reshape((1, ss.get_natoms(), 3))
data["virials"] = v.reshape((1, 3, 3))
this_sys = dpdata.LabeledSystem.from_dict({"data": data})
labeled_sys.append(this_sys)
data = labeled_sys.data
else:
# since v2.0.2, auto batch size is supported
coord = ori_sys.data["coords"].reshape(
(ori_sys.get_nframes(), ori_sys.get_natoms() * 3)
)
if not ori_sys.nopbc:
cell = ori_sys.data["cells"].reshape((ori_sys.get_nframes(), 9))
else:
cell = None
e, f, v = self.dp.eval(coord, cell, atype)
data = ori_sys.data.copy()
data["energies"] = e.reshape((ori_sys.get_nframes(),))
data["forces"] = f.reshape((ori_sys.get_nframes(), ori_sys.get_natoms(), 3))
data["virials"] = v.reshape((ori_sys.get_nframes(), 3, 3))
return data