Source code for dpdata.lammps.dump

#!/usr/bin/env python3
from __future__ import annotations

import os
import sys
from typing import TYPE_CHECKING

import numpy as np

from dpdata.utils import open_file

if TYPE_CHECKING:
    from dpdata.utils import FileType

lib_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(lib_path)
import warnings

import lmp


[docs] class UnwrapWarning(UserWarning): pass
warnings.simplefilter("once", UnwrapWarning) def _get_block(lines, key): for idx in range(len(lines)): if ("ITEM: " + key) in lines[idx]: break idx_s = idx + 1 for idx in range(idx_s, len(lines)): if ("ITEM: ") in lines[idx]: break idx_e = idx if idx_e == len(lines) - 1: idx_e += 1 return lines[idx_s:idx_e], lines[idx_s - 1]
[docs] def get_atype(lines, type_idx_zero=False): blk, head = _get_block(lines, "ATOMS") keys = head.split() id_idx = keys.index("id") - 2 tidx = keys.index("type") - 2 atype = [] for ii in blk: atype.append([int(ii.split()[id_idx]), int(ii.split()[tidx])]) atype.sort() atype = np.array(atype, dtype=int) if type_idx_zero: return atype[:, 1] - 1 else: return atype[:, 1]
[docs] def get_natoms(lines): blk, head = _get_block(lines, "NUMBER OF ATOMS") return int(blk[0])
[docs] def get_natomtypes(lines): atype = get_atype(lines) return max(atype)
[docs] def get_natoms_vec(lines): atype = get_atype(lines) natoms_vec = [] natomtypes = get_natomtypes(lines) for ii in range(natomtypes): natoms_vec.append(sum(atype == ii + 1)) assert sum(natoms_vec) == get_natoms(lines) return natoms_vec
[docs] def get_coordtype_and_scalefactor(keys): # 4 types in total,with different scaling factor key_pc = ["x", "y", "z"] # plain cartesian, sf = 1 key_uc = ["xu", "yu", "zu"] # unwraped cartesian, sf = 1 key_s = ["xs", "ys", "zs"] # scaled by lattice parameter, sf = lattice parameter key_su = ["xsu", "ysu", "zsu"] # scaled and unfolded,sf = lattice parameter lmp_coor_type = [key_pc, key_uc, key_s, key_su] sf = [0, 0, 1, 1] uw = [0, 1, 0, 1] # unwraped or not for k in range(4): if all(i in keys for i in lmp_coor_type[k]): return lmp_coor_type[k], sf[k], uw[k]
[docs] def safe_get_posi(lines, cell, orig=np.zeros(3), unwrap=False): blk, head = _get_block(lines, "ATOMS") keys = head.split() coord_tp_and_sf = get_coordtype_and_scalefactor(keys) assert coord_tp_and_sf is not None, "Dump file does not contain atomic coordinates!" coordtype, sf, uw = coord_tp_and_sf id_idx = keys.index("id") - 2 xidx = keys.index(coordtype[0]) - 2 yidx = keys.index(coordtype[1]) - 2 zidx = keys.index(coordtype[2]) - 2 posis = [] for ii in blk: words = ii.split() posis.append( [ float(words[id_idx]), float(words[xidx]), float(words[yidx]), float(words[zidx]), ] ) posis.sort() posis = np.array(posis)[:, 1:4] if not sf: posis = (posis - orig) @ np.linalg.inv( cell ) # Convert to scaled coordinates for unscaled coordinates if uw and unwrap: return ( posis @ cell ) # convert scaled coordinates back to Cartesien coordinates unwrap at the periodic boundaries else: if uw and not unwrap: warnings.warn( message="Your dump file contains unwrapped coordinates, but you did not specify unwrapping (unwrap = True). The default is wrapping at periodic boundaries (unwrap = False).\n", category=UnwrapWarning, ) return ( (posis % 1) @ cell ) # Convert scaled coordinates back to Cartesien coordinates with wraping at periodic boundary conditions
[docs] def get_dumpbox(lines): blk, h = _get_block(lines, "BOX BOUNDS") bounds = np.zeros([3, 2]) tilt = np.zeros([3]) load_tilt = "xy xz yz" in h for dd in range(3): info = [float(jj) for jj in blk[dd].split()] bounds[dd][0] = info[0] bounds[dd][1] = info[1] if load_tilt: tilt[dd] = info[2] return bounds, tilt
[docs] def dumpbox2box(bounds, tilt): xy = tilt[0] xz = tilt[1] yz = tilt[2] xlo = bounds[0][0] - min(0.0, xy, xz, xy + xz) xhi = bounds[0][1] - max(0.0, xy, xz, xy + xz) ylo = bounds[1][0] - min(0.0, yz) yhi = bounds[1][1] - max(0.0, yz) zlo = bounds[2][0] zhi = bounds[2][1] info = [[xlo, xhi], [ylo, yhi], [zlo, zhi]] return lmp.lmpbox2box(info, tilt)
[docs] def box2dumpbox(orig, box): lohi, tilt = lmp.box2lmpbox(orig, box) xy = tilt[0] xz = tilt[1] yz = tilt[2] bounds = np.zeros([3, 2]) bounds[0][0] = lohi[0][0] + min(0.0, xy, xz, xy + xz) bounds[0][1] = lohi[0][1] + max(0.0, xy, xz, xy + xz) bounds[1][0] = lohi[1][0] + min(0.0, yz) bounds[1][1] = lohi[1][1] + max(0.0, yz) bounds[2][0] = lohi[2][0] bounds[2][1] = lohi[2][1] return bounds, tilt
[docs] def load_file(fname: FileType, begin=0, step=1): lines = [] buff = [] cc = -1 with open_file(fname) as fp: while True: line = fp.readline().rstrip("\n") if not line: if cc >= begin and (cc - begin) % step == 0: lines += buff buff = [] cc += 1 return lines if "ITEM: TIMESTEP" in line: if cc >= begin and (cc - begin) % step == 0: lines += buff buff = [] cc += 1 if cc >= begin and (cc - begin) % step == 0: buff.append(line)
[docs] def get_spin_keys(inputfile): """ Read input file and get the keys for spin info in dump. Parameters ---------- inputfile : str Path to the input file. Returns ------- list or None List of spin info keys if found, None otherwise. """ if inputfile is None: return None if not os.path.isfile(inputfile): warnings.warn(f"Input file {inputfile} not found.") return None with open(inputfile) as f: for line in f.readlines(): ls = line.split() if ( len(ls) > 7 and ls[0] == "compute" and all(key in ls for key in ["sp", "spx", "spy", "spz"]) ): compute_name = ls[1] return [ f"c_{compute_name}[{ls.index(key) - 3}]" for key in ["sp", "spx", "spy", "spz"] ] return None
[docs] def get_spin(lines, spin_keys): """ Get the spin info from the dump file. Parameters ---------- lines : list The content of the dump file. spin_keys : list The keys for spin info in dump file. the spin info is stored in sp, spx, spy, spz or spin_keys, which is the spin norm and the spin vector 1 1 0.00141160 5.64868599 0.01005602 1.54706291 0.00000000 0.00000000 1.00000000 -1.40772100 -2.03739417 -1522.64797384 -0.00397809 -0.00190426 -0.00743976 """ blk, head = _get_block(lines, "ATOMS") heads = head.split() if spin_keys is not None and all(i in heads for i in spin_keys): key = spin_keys else: return None try: idx_id = heads.index("id") - 2 idx_sp, idx_spx, idx_spy, idx_spz = (heads.index(k) - 2 for k in key) norm = [] vec = [] atom_ids = [] for line in blk: words = line.split() norm.append([float(words[idx_sp])]) vec.append( [float(words[idx_spx]), float(words[idx_spy]), float(words[idx_spz])] ) atom_ids.append(int(words[idx_id])) spin = np.array(norm) * np.array(vec) atom_ids, spin = zip(*sorted(zip(atom_ids, spin))) return np.array(spin) except (ValueError, IndexError) as e: warnings.warn(f"Error processing spin data: {str(e)}") return None
[docs] def system_data( lines, type_map=None, type_idx_zero=True, unwrap=False, input_file=None ): array_lines = split_traj(lines) lines = array_lines[0] system = {} system["atom_numbs"] = get_natoms_vec(lines) system["atom_names"] = [] if type_map is None: for ii in range(len(system["atom_numbs"])): system["atom_names"].append("TYPE_%d" % ii) # noqa: UP031 else: assert len(type_map) >= len(system["atom_numbs"]) for ii in range(len(system["atom_numbs"])): system["atom_names"].append(type_map[ii]) bounds, tilt = get_dumpbox(lines) orig, cell = dumpbox2box(bounds, tilt) system["orig"] = np.array(orig) - np.array(orig) system["cells"] = [np.array(cell)] system["atom_types"] = get_atype(lines, type_idx_zero=type_idx_zero) system["coords"] = [safe_get_posi(lines, cell, np.array(orig), unwrap)] spin_keys = get_spin_keys(input_file) spin = get_spin(lines, spin_keys) has_spin = False if spin is not None: system["spins"] = [spin] has_spin = True for ii in range(1, len(array_lines)): bounds, tilt = get_dumpbox(array_lines[ii]) orig, cell = dumpbox2box(bounds, tilt) system["cells"].append(cell) atype = get_atype(array_lines[ii], type_idx_zero=type_idx_zero) # map atom type; a[as[a][as[as[b]]]] = b[as[b][as^{-1}[b]]] = b[id] idx = np.argsort(atype, kind="stable")[ np.argsort(np.argsort(system["atom_types"], kind="stable"), kind="stable") ] system["coords"].append( safe_get_posi(array_lines[ii], cell, np.array(orig), unwrap)[idx] ) if has_spin: spin = get_spin(array_lines[ii], spin_keys) if spin is not None: system["spins"].append(spin[idx]) else: warnings.warn( f"Warning: spin info is not found in frame {ii}, remove spin info." ) system.pop("spins") has_spin = False if has_spin: system["spins"] = np.array(system["spins"]) system["cells"] = np.array(system["cells"]) system["coords"] = np.array(system["coords"]) return system
[docs] def split_traj(dump_lines): marks = [] for idx, ii in enumerate(dump_lines): if "ITEM: TIMESTEP" in ii: marks.append(idx) if len(marks) == 0: return None elif len(marks) == 1: return [dump_lines] else: block_size = marks[1] - marks[0] ret = [] for ii in marks: ret.append(dump_lines[ii : ii + block_size]) # for ii in range(len(marks)-1): # assert(marks[ii+1] - marks[ii] == block_size) return ret return None
if __name__ == "__main__": # fname = 'dump.hti' # lines = open(fname).read().split('\n') # # print(get_natoms(lines)) # # print(get_natomtypes(lines)) # # print(get_natoms_vec(lines)) # posi = get_posi(lines) # dbox1, tilt1 = box2dumpbox(orig, box) # print(dbox - dbox1) # print(tilt - tilt1) # print(orig) # print(box) # np.savetxt('tmp.out', posi - orig, fmt='%.6f') # print(system_data(lines)) lines = load_file("conf_unfold.dump", begin=0, step=1) al = split_traj(lines) s = system_data(lines, ["O", "H"]) # l = np.linalg.norm(s['cells'][1],axis=1) # p = s['coords'][0] + l # np.savetxt('p',p,fmt='%1.10f')