Source code for dpgen.auto_test.Lammps
import os
import warnings
from monty.serialization import dumpfn, loadfn
import dpgen.auto_test.lib.lammps as lammps
from dpgen import dlog
from dpgen.auto_test.lib.lammps import (
inter_deepmd,
inter_eam_alloy,
inter_eam_fs,
inter_meam,
)
from dpgen.auto_test.Task import Task
supported_inter = ["deepmd", "meam", "eam_fs", "eam_alloy"]
[docs]
class Lammps(Task):
def __init__(self, inter_parameter, path_to_poscar):
self.inter = inter_parameter
self.inter_type = inter_parameter["type"]
self.type_map = inter_parameter["type_map"]
self.in_lammps = inter_parameter.get("in_lammps", "auto")
if self.inter_type == "meam":
self.model = list(map(os.path.abspath, inter_parameter["model"]))
else:
self.model = os.path.abspath(inter_parameter["model"])
self.path_to_poscar = path_to_poscar
assert self.inter_type in supported_inter
self.set_inter_type_func()
[docs]
def set_inter_type_func(self):
if self.inter_type == "deepmd":
self.inter_func = inter_deepmd
elif self.inter_type == "meam":
self.inter_func = inter_meam
elif self.inter_type == "eam_fs":
self.inter_func = inter_eam_fs
else:
self.inter_func = inter_eam_alloy
[docs]
def set_model_param(self):
if self.inter_type == "deepmd":
model_name = os.path.basename(self.model)
deepmd_version = self.inter.get("deepmd_version", "1.2.0")
self.model_param = {
"model_name": [model_name],
"param_type": self.type_map,
"deepmd_version": deepmd_version,
}
elif self.inter_type == "meam":
model_name = list(map(os.path.basename, self.model))
self.model_param = {"model_name": [model_name], "param_type": self.type_map}
else:
model_name = os.path.basename(self.model)
self.model_param = {"model_name": [model_name], "param_type": self.type_map}
[docs]
def make_potential_files(self, output_dir):
cwd = os.getcwd()
if self.inter_type == "meam":
model_lib = os.path.basename(self.model[0])
model_file = os.path.basename(self.model[1])
os.chdir(os.path.join(output_dir, "../"))
if os.path.islink(model_lib):
link_lib = os.readlink(model_lib)
if not os.path.abspath(link_lib) == self.model[0]:
os.remove(model_lib)
os.symlink(os.path.relpath(self.model[0]), model_lib)
else:
os.symlink(os.path.relpath(self.model[0]), model_lib)
if os.path.islink(model_file):
link_file = os.readlink(model_file)
if not os.path.abspath(link_file) == self.model[1]:
os.remove(model_file)
os.symlink(os.path.relpath(self.model[1]), model_file)
else:
os.symlink(os.path.relpath(self.model[1]), model_file)
os.chdir(output_dir)
if not os.path.islink(model_lib):
os.symlink(os.path.join("..", model_lib), model_lib)
elif not os.path.join("..", model_lib) == os.readlink(model_lib):
os.remove(model_lib)
os.symlink(os.path.join("..", model_lib), model_lib)
if not os.path.islink(model_file):
os.symlink(os.path.join("..", model_file), model_file)
elif not os.path.join("..", model_file) == os.readlink(model_file):
os.remove(model_file)
os.symlink(os.path.join("..", model_file), model_file)
os.chdir(cwd)
else:
model_file = os.path.basename(self.model)
os.chdir(os.path.join(output_dir, "../"))
if os.path.islink(model_file):
link_file = os.readlink(model_file)
if not os.path.abspath(link_file) == self.model:
os.remove(model_file)
os.symlink(os.path.relpath(self.model), model_file)
else:
os.symlink(os.path.relpath(self.model), model_file)
os.chdir(output_dir)
if not os.path.islink(model_file):
os.symlink(os.path.join("..", model_file), model_file)
elif not os.path.join("..", model_file) == os.readlink(model_file):
os.remove(model_file)
os.symlink(os.path.join("..", model_file), model_file)
os.chdir(cwd)
dumpfn(self.inter, os.path.join(output_dir, "inter.json"), indent=4)
[docs]
def make_input_file(self, output_dir, task_type, task_param):
lammps.cvt_lammps_conf(
os.path.join(output_dir, "POSCAR"),
os.path.join(output_dir, "conf.lmp"),
lammps.element_list(self.type_map),
)
# dumpfn(task_param, os.path.join(output_dir, 'task.json'), indent=4)
etol = 0
ftol = 1e-10
maxiter = 5000
maxeval = 500000
B0 = 70
bp = 0
ntypes = len(self.type_map)
cal_type = task_param["cal_type"]
cal_setting = task_param["cal_setting"]
self.set_model_param()
# deal with user input in.lammps for relaxation
if os.path.isfile(self.in_lammps) and task_type == "relaxation":
with open(self.in_lammps) as fin:
fc = fin.read()
# user input in.lammps for property calculation
if "input_prop" in cal_setting and os.path.isfile(cal_setting["input_prop"]):
with open(os.path.abspath(cal_setting["input_prop"])) as fin:
fc = fin.read()
else:
if "etol" in cal_setting:
dlog.info(
"{} setting etol to {}".format(
self.make_input_file.__name__, cal_setting["etol"]
)
)
etol = cal_setting["etol"]
if "ftol" in cal_setting:
dlog.info(
"{} setting ftol to {}".format(
self.make_input_file.__name__, cal_setting["ftol"]
)
)
ftol = cal_setting["ftol"]
if "maxiter" in cal_setting:
dlog.info(
"{} setting maxiter to {}".format(
self.make_input_file.__name__, cal_setting["maxiter"]
)
)
maxiter = cal_setting["maxiter"]
if "maxeval" in cal_setting:
dlog.info(
"{} setting maxeval to {}".format(
self.make_input_file.__name__, cal_setting["maxeval"]
)
)
maxeval = cal_setting["maxeval"]
if cal_type == "relaxation":
relax_pos = cal_setting["relax_pos"]
relax_shape = cal_setting["relax_shape"]
relax_vol = cal_setting["relax_vol"]
if [relax_pos, relax_shape, relax_vol] == [True, False, False]:
fc = lammps.make_lammps_equi(
"conf.lmp",
self.type_map,
self.inter_func,
self.model_param,
etol,
ftol,
maxiter,
maxeval,
False,
)
elif [relax_pos, relax_shape, relax_vol] == [True, True, True]:
fc = lammps.make_lammps_equi(
"conf.lmp",
self.type_map,
self.inter_func,
self.model_param,
etol,
ftol,
maxiter,
maxeval,
True,
)
elif [relax_pos, relax_shape, relax_vol] == [
True,
True,
False,
] and not task_type == "eos":
if "scale2equi" in task_param:
scale2equi = task_param["scale2equi"]
fc = lammps.make_lammps_press_relax(
"conf.lmp",
self.type_map,
scale2equi[int(output_dir[-6:])],
self.inter_func,
self.model_param,
B0,
bp,
etol,
ftol,
maxiter,
maxeval,
)
else:
fc = lammps.make_lammps_equi(
"conf.lmp",
self.type_map,
self.inter_func,
self.model_param,
etol,
ftol,
maxiter,
maxeval,
True,
)
elif [relax_pos, relax_shape, relax_vol] == [
True,
True,
False,
] and task_type == "eos":
task_param["cal_setting"]["relax_shape"] = False
fc = lammps.make_lammps_equi(
"conf.lmp",
self.type_map,
self.inter_func,
self.model_param,
etol,
ftol,
maxiter,
maxeval,
False,
)
elif [relax_pos, relax_shape, relax_vol] == [False, False, False]:
fc = lammps.make_lammps_eval(
"conf.lmp", self.type_map, self.inter_func, self.model_param
)
else:
raise RuntimeError("not supported calculation setting for LAMMPS")
elif cal_type == "static":
fc = lammps.make_lammps_eval(
"conf.lmp", self.type_map, self.inter_func, self.model_param
)
else:
raise RuntimeError("not supported calculation type for LAMMPS")
dumpfn(task_param, os.path.join(output_dir, "task.json"), indent=4)
in_lammps_not_link_list = ["eos"]
if task_type not in in_lammps_not_link_list:
with open(os.path.join(output_dir, "../in.lammps"), "w") as fp:
fp.write(fc)
cwd = os.getcwd()
os.chdir(output_dir)
if not (os.path.islink("in.lammps") or os.path.isfile("in.lammps")):
os.symlink("../in.lammps", "in.lammps")
else:
os.remove("in.lammps")
os.symlink("../in.lammps", "in.lammps")
os.chdir(cwd)
else:
with open(os.path.join(output_dir, "in.lammps"), "w") as fp:
fp.write(fc)
[docs]
def compute(self, output_dir):
log_lammps = os.path.join(output_dir, "log.lammps")
dump_lammps = os.path.join(output_dir, "dump.relax")
if not os.path.isfile(log_lammps):
warnings.warn("cannot find log.lammps in " + output_dir + " skip")
return None
if not os.path.isfile(dump_lammps):
warnings.warn("cannot find dump.relax in " + output_dir + " skip")
return None
else:
box = []
coord = []
vol = []
energy = []
force = []
virial = []
stress = []
with open(dump_lammps) as fin:
dump = fin.read().split("\n")
dumptime = []
for idx, ii in enumerate(dump):
if ii == "ITEM: TIMESTEP":
box.append([])
coord.append([])
force.append([])
dumptime.append(int(dump[idx + 1]))
natom = int(dump[idx + 3])
xlo_bound = float(dump[idx + 5].split()[0])
xhi_bound = float(dump[idx + 5].split()[1])
xy = float(dump[idx + 5].split()[2])
ylo_bound = float(dump[idx + 6].split()[0])
yhi_bound = float(dump[idx + 6].split()[1])
xz = float(dump[idx + 6].split()[2])
zlo = float(dump[idx + 7].split()[0])
zhi = float(dump[idx + 7].split()[1])
yz = float(dump[idx + 7].split()[2])
xx = (
xhi_bound
- max([0, xy, xz, xy + xz])
- (xlo_bound - min([0, xy, xz, xy + xz]))
)
yy = yhi_bound - max([0, yz]) - (ylo_bound - min([0, yz]))
zz = zhi - zlo
box[-1].append([xx, 0.0, 0.0])
box[-1].append([xy, yy, 0.0])
box[-1].append([xz, yz, zz])
vol.append(xx * yy * zz)
type_list = []
for jj in range(natom):
type_list.append(int(dump[idx + 9 + jj].split()[1]) - 1)
if "xs ys zs" in dump[idx + 8]:
a_x = (
float(dump[idx + 9 + jj].split()[2]) * xx
+ float(dump[idx + 9 + jj].split()[3]) * xy
+ float(dump[idx + 9 + jj].split()[4]) * xz
)
a_y = (
float(dump[idx + 9 + jj].split()[3]) * yy
+ float(dump[idx + 9 + jj].split()[4]) * yz
)
a_z = float(dump[idx + 9 + jj].split()[4]) * zz
else:
a_x = float(dump[idx + 9 + jj].split()[2])
a_y = float(dump[idx + 9 + jj].split()[3])
a_z = float(dump[idx + 9 + jj].split()[4])
coord[-1].append([a_x, a_y, a_z])
fx = float(dump[idx + 9 + jj].split()[5])
fy = float(dump[idx + 9 + jj].split()[6])
fz = float(dump[idx + 9 + jj].split()[7])
force[-1].append([fx, fy, fz])
with open(log_lammps) as fp:
if "Total wall time:" not in fp.read():
warnings.warn("lammps not finished " + log_lammps + " skip")
return None
else:
fp.seek(0)
lines = fp.read().split("\n")
idid = -1
for ii in dumptime:
idid += 1
for jj in lines:
line = jj.split()
if len(line) and str(ii) == line[0]:
try:
[float(kk) for kk in line]
except Exception:
continue
stress.append([])
virial.append([])
energy.append(float(line[1]))
# virials = stress * vol * 1e5 *1e-30 * 1e19/1.6021766208
stress[-1].append(
[
float(line[2]) / 1000.0,
float(line[5]) / 1000.0,
float(line[6]) / 1000.0,
]
)
stress[-1].append(
[
float(line[5]) / 1000.0,
float(line[3]) / 1000.0,
float(line[7]) / 1000.0,
]
)
stress[-1].append(
[
float(line[6]) / 1000.0,
float(line[7]) / 1000.0,
float(line[4]) / 1000.0,
]
)
stress_to_virial = (
vol[idid] * 1e5 * 1e-30 * 1e19 / 1.6021766208
)
virial[-1].append(
[
float(line[2]) * stress_to_virial,
float(line[5]) * stress_to_virial,
float(line[6]) * stress_to_virial,
]
)
virial[-1].append(
[
float(line[5]) * stress_to_virial,
float(line[3]) * stress_to_virial,
float(line[7]) * stress_to_virial,
]
)
virial[-1].append(
[
float(line[6]) * stress_to_virial,
float(line[7]) * stress_to_virial,
float(line[4]) * stress_to_virial,
]
)
break
_tmp = self.type_map
dlog.debug(_tmp)
type_map_list = lammps.element_list(self.type_map)
type_map_idx = list(range(len(type_map_list)))
atom_numbs = []
for ii in type_map_idx:
count = 0
for jj in type_list:
if jj == ii:
count += 1
atom_numbs.append(count)
# d_dump = dpdata.System(dump_lammps, fmt='lammps/dump', type_map=type_map_list)
# d_dump.to('vasp/poscar', contcar, frame_idx=-1)
result_dict = {
"@module": "dpdata.system",
"@class": "LabeledSystem",
"data": {
"atom_numbs": atom_numbs,
"atom_names": type_map_list,
"atom_types": {
"@module": "numpy",
"@class": "array",
"dtype": "int64",
"data": type_list,
},
"orig": {
"@module": "numpy",
"@class": "array",
"dtype": "int64",
"data": [0, 0, 0],
},
"cells": {
"@module": "numpy",
"@class": "array",
"dtype": "float64",
"data": box,
},
"coords": {
"@module": "numpy",
"@class": "array",
"dtype": "float64",
"data": coord,
},
"energies": {
"@module": "numpy",
"@class": "array",
"dtype": "float64",
"data": energy,
},
"forces": {
"@module": "numpy",
"@class": "array",
"dtype": "float64",
"data": force,
},
"virials": {
"@module": "numpy",
"@class": "array",
"dtype": "float64",
"data": virial,
},
"stress": {
"@module": "numpy",
"@class": "array",
"dtype": "float64",
"data": stress,
},
},
}
contcar = os.path.join(output_dir, "CONTCAR")
dumpfn(result_dict, contcar, indent=4)
d_dump = loadfn(contcar)
d_dump.to("vasp/poscar", contcar, frame_idx=-1)
return result_dict
[docs]
def forward_files(self, property_type="relaxation"):
if self.inter_type == "meam":
return ["conf.lmp", "in.lammps"] + list(map(os.path.basename, self.model))
else:
return ["conf.lmp", "in.lammps", os.path.basename(self.model)]
[docs]
def forward_common_files(self, property_type="relaxation"):
if property_type not in ["eos"]:
if self.inter_type == "meam":
return ["in.lammps"] + list(map(os.path.basename, self.model))
else:
return ["in.lammps", os.path.basename(self.model)]
else:
if self.inter_type == "meam":
return list(map(os.path.basename, self.model))
else:
return [os.path.basename(self.model)]
[docs]
def backward_files(self, property_type="relaxation"):
return ["log.lammps", "outlog", "dump.relax"]