import glob
import os
import shutil
import numpy as np
[docs]
def load_type(folder):
data = {}
data["atom_names"] = []
# if find type_map.raw, use it
assert os.path.isfile(
os.path.join(folder, "type_map.raw")
), "Mixed type system must have type_map.raw!"
with open(os.path.join(folder, "type_map.raw")) as fp:
data["atom_names"] = fp.read().split()
return data
def _cond_load_data(fname):
tmp = None
if os.path.isfile(fname):
tmp = np.load(fname)
return tmp
def _load_set(folder, nopbc: bool):
coords = np.load(os.path.join(folder, "coord.npy"))
if nopbc:
cells = np.zeros((coords.shape[0], 3, 3))
else:
cells = np.load(os.path.join(folder, "box.npy"))
eners = _cond_load_data(os.path.join(folder, "energy.npy"))
forces = _cond_load_data(os.path.join(folder, "force.npy"))
virs = _cond_load_data(os.path.join(folder, "virial.npy"))
real_atom_types = np.load(os.path.join(folder, "real_atom_types.npy"))
return cells, coords, eners, forces, virs, real_atom_types
[docs]
def to_system_data(folder, type_map=None, labels=True):
# data is empty
data = load_type(folder)
old_type_map = data["atom_names"].copy()
if type_map is not None:
assert isinstance(type_map, list)
missing_type = [i for i in old_type_map if i not in type_map]
assert (
not missing_type
), f"These types are missing in selected type_map: {missing_type} !"
index_map = np.array([type_map.index(i) for i in old_type_map])
data["atom_names"] = type_map.copy()
else:
index_map = None
data["orig"] = np.zeros([3])
if os.path.isfile(os.path.join(folder, "nopbc")):
data["nopbc"] = True
sets = sorted(glob.glob(os.path.join(folder, "set.*")))
all_cells = []
all_coords = []
all_eners = []
all_forces = []
all_virs = []
all_real_atom_types = []
for ii in sets:
cells, coords, eners, forces, virs, real_atom_types = _load_set(
ii, data.get("nopbc", False)
)
nframes = np.reshape(cells, [-1, 3, 3]).shape[0]
all_cells.append(np.reshape(cells, [nframes, 3, 3]))
all_coords.append(np.reshape(coords, [nframes, -1, 3]))
if index_map is None:
all_real_atom_types.append(np.reshape(real_atom_types, [nframes, -1]))
else:
all_real_atom_types.append(
np.reshape(index_map[real_atom_types], [nframes, -1])
)
if eners is not None:
eners = np.reshape(eners, [nframes])
if labels:
if eners is not None and eners.size > 0:
all_eners.append(np.reshape(eners, [nframes]))
if forces is not None and forces.size > 0:
all_forces.append(np.reshape(forces, [nframes, -1, 3]))
if virs is not None and virs.size > 0:
all_virs.append(np.reshape(virs, [nframes, 3, 3]))
all_cells_concat = np.concatenate(all_cells, axis=0)
all_coords_concat = np.concatenate(all_coords, axis=0)
all_real_atom_types_concat = np.concatenate(all_real_atom_types, axis=0)
all_eners_concat = None
all_forces_concat = None
all_virs_concat = None
if len(all_eners) > 0:
all_eners_concat = np.concatenate(all_eners, axis=0)
if len(all_forces) > 0:
all_forces_concat = np.concatenate(all_forces, axis=0)
if len(all_virs) > 0:
all_virs_concat = np.concatenate(all_virs, axis=0)
data_list = []
while True:
if all_real_atom_types_concat.size == 0:
break
temp_atom_numbs = [
np.count_nonzero(all_real_atom_types_concat[0] == i)
for i in range(len(data["atom_names"]))
]
# temp_formula = formula(data['atom_names'], temp_atom_numbs)
temp_idx = np.arange(all_real_atom_types_concat.shape[0])[
(all_real_atom_types_concat == all_real_atom_types_concat[0]).all(-1)
]
rest_idx = np.arange(all_real_atom_types_concat.shape[0])[
(all_real_atom_types_concat != all_real_atom_types_concat[0]).any(-1)
]
temp_data = data.copy()
temp_data["atom_names"] = data["atom_names"].copy()
temp_data["atom_numbs"] = temp_atom_numbs
temp_data["atom_types"] = all_real_atom_types_concat[0]
all_real_atom_types_concat = all_real_atom_types_concat[rest_idx]
temp_data["cells"] = all_cells_concat[temp_idx]
all_cells_concat = all_cells_concat[rest_idx]
temp_data["coords"] = all_coords_concat[temp_idx]
all_coords_concat = all_coords_concat[rest_idx]
if labels:
if all_eners_concat is not None and all_eners_concat.size > 0:
temp_data["energies"] = all_eners_concat[temp_idx]
all_eners_concat = all_eners_concat[rest_idx]
if all_forces_concat is not None and all_forces_concat.size > 0:
temp_data["forces"] = all_forces_concat[temp_idx]
all_forces_concat = all_forces_concat[rest_idx]
if all_virs_concat is not None and all_virs_concat.size > 0:
temp_data["virials"] = all_virs_concat[temp_idx]
all_virs_concat = all_virs_concat[rest_idx]
data_list.append(temp_data)
return data_list
[docs]
def dump(folder, data, set_size=2000, comp_prec=np.float32, remove_sets=True):
os.makedirs(folder, exist_ok=True)
sets = sorted(glob.glob(os.path.join(folder, "set.*")))
if len(sets) > 0:
if remove_sets:
for ii in sets:
shutil.rmtree(ii)
else:
raise RuntimeError(
"found "
+ str(sets)
+ " in "
+ folder
+ "not a clean deepmd raw dir. please firstly clean set.* then try compress"
)
# if not converted to mixed
if "real_atom_types" not in data:
from dpdata import LabeledSystem, System
if "energies" in data:
temp_sys = LabeledSystem(data=data)
else:
temp_sys = System(data=data)
temp_sys.convert_to_mixed_type()
# dump raw
np.savetxt(os.path.join(folder, "type.raw"), data["atom_types"], fmt="%d")
np.savetxt(os.path.join(folder, "type_map.raw"), data["real_atom_names"], fmt="%s")
# BondOrder System
if "bonds" in data:
np.savetxt(
os.path.join(folder, "bonds.raw"),
data["bonds"],
header="begin_atom, end_atom, bond_order",
)
if "formal_charges" in data:
np.savetxt(os.path.join(folder, "formal_charges.raw"), data["formal_charges"])
# reshape frame properties and convert prec
nframes = data["cells"].shape[0]
cells = np.reshape(data["cells"], [nframes, 9]).astype(comp_prec)
coords = np.reshape(data["coords"], [nframes, -1]).astype(comp_prec)
eners = None
forces = None
virials = None
real_atom_types = None
if "energies" in data:
eners = np.reshape(data["energies"], [nframes]).astype(comp_prec)
if "forces" in data:
forces = np.reshape(data["forces"], [nframes, -1]).astype(comp_prec)
if "virials" in data:
virials = np.reshape(data["virials"], [nframes, 9]).astype(comp_prec)
if "atom_pref" in data:
atom_pref = np.reshape(data["atom_pref"], [nframes, -1]).astype(comp_prec)
if "real_atom_types" in data:
real_atom_types = np.reshape(data["real_atom_types"], [nframes, -1]).astype(
np.int64
)
# dump frame properties: cell, coord, energy, force and virial
nsets = nframes // set_size
if set_size * nsets < nframes:
nsets += 1
for ii in range(nsets):
set_stt = ii * set_size
set_end = (ii + 1) * set_size
set_folder = os.path.join(folder, "set.%06d" % ii)
os.makedirs(set_folder)
np.save(os.path.join(set_folder, "box"), cells[set_stt:set_end])
np.save(os.path.join(set_folder, "coord"), coords[set_stt:set_end])
if eners is not None:
np.save(os.path.join(set_folder, "energy"), eners[set_stt:set_end])
if forces is not None:
np.save(os.path.join(set_folder, "force"), forces[set_stt:set_end])
if virials is not None:
np.save(os.path.join(set_folder, "virial"), virials[set_stt:set_end])
if real_atom_types is not None:
np.save(
os.path.join(set_folder, "real_atom_types"),
real_atom_types[set_stt:set_end],
)
if "atom_pref" in data:
np.save(os.path.join(set_folder, "atom_pref"), atom_pref[set_stt:set_end])
try:
os.remove(os.path.join(folder, "nopbc"))
except OSError:
pass
if data.get("nopbc", False):
with open(os.path.join(folder, "nopbc"), "w") as fw_nopbc:
pass
[docs]
def mix_system(*system, type_map, **kwargs):
"""Mix the systems into mixed_type ones according to the unified given type_map.
Parameters
----------
*system : System
The systems to mix
type_map : list of str
Maps atom type to name
**kwargs : dict
Other parameters
Returns
-------
mixed_systems: dict
dict of mixed system with key 'atom_numbs'
"""
mixed_systems = {}
temp_systems = {}
atom_numbs_frame_index = {} # index of frames in cur sys
for sys in system:
tmp_sys = sys.copy()
natom = tmp_sys.get_natoms()
tmp_sys.convert_to_mixed_type(type_map=type_map)
if str(natom) not in atom_numbs_frame_index:
atom_numbs_frame_index[str(natom)] = 0
atom_numbs_frame_index[str(natom)] += tmp_sys.get_nframes()
if str(natom) not in temp_systems or not temp_systems[str(natom)]:
temp_systems[str(natom)] = tmp_sys
else:
temp_systems[str(natom)].append(tmp_sys)
for natom in temp_systems:
if atom_numbs_frame_index[natom] > 0:
sys_name = f"{natom}"
mixed_systems[sys_name] = temp_systems[natom]
return mixed_systems
[docs]
def split_system(sys, split_num=10000):
rest = sys.get_nframes() - split_num
if rest <= 0:
return sys, None, 0
else:
split_sys = sys.sub_system(range(split_num))
rest_sys = sys.sub_system(range(split_num, sys.get_nframes()))
return split_sys, rest_sys, rest