import glob
import json
import os
import re
import dpdata
import numpy as np
from ase.lattice.cubic import BodyCenteredCubic as bcc
from ase.lattice.cubic import FaceCenteredCubic as fcc
from monty.serialization import dumpfn, loadfn
import dpgen.auto_test.lib.abacus as abacus
import dpgen.auto_test.lib.vasp as vasp
from dpgen import dlog
from dpgen.auto_test.Property import Property
from dpgen.auto_test.refine import make_refine
from dpgen.auto_test.reproduce import make_repro, post_repro
[docs]
class Gamma(Property):
"""Calculation of common gamma lines for bcc and fcc."""
def __init__(self, parameter, inter_param=None):
parameter["reproduce"] = parameter.get("reproduce", False)
self.reprod = parameter["reproduce"]
if not self.reprod:
if not ("init_from_suffix" in parameter and "output_suffix" in parameter):
self.miller_index = parameter["miller_index"]
self.displace_direction = parameter["displace_direction"]
self.lattice_type = parameter["lattice_type"]
parameter["supercell_size"] = parameter.get("supercell_size", (1, 1, 5))
self.supercell_size = parameter["supercell_size"]
parameter["min_vacuum_size"] = parameter.get("min_vacuum_size", 20)
self.min_vacuum_size = parameter["min_vacuum_size"]
parameter["add_fix"] = parameter.get(
"add_fix", ["true", "true", "false"]
) # standard method
self.add_fix = parameter["add_fix"]
parameter["n_steps"] = parameter.get("n_steps", 10)
self.n_steps = parameter["n_steps"]
self.atom_num = None
parameter["cal_type"] = parameter.get("cal_type", "relaxation")
self.cal_type = parameter["cal_type"]
default_cal_setting = {
"relax_pos": True,
"relax_shape": False,
"relax_vol": False,
}
if "cal_setting" not in parameter:
parameter["cal_setting"] = default_cal_setting
else:
if "relax_pos" not in parameter["cal_setting"]:
parameter["cal_setting"]["relax_pos"] = default_cal_setting[
"relax_pos"
]
if "relax_shape" not in parameter["cal_setting"]:
parameter["cal_setting"]["relax_shape"] = default_cal_setting[
"relax_shape"
]
if "relax_vol" not in parameter["cal_setting"]:
parameter["cal_setting"]["relax_vol"] = default_cal_setting[
"relax_vol"
]
self.cal_setting = parameter["cal_setting"]
else:
parameter["cal_type"] = "static"
self.cal_type = parameter["cal_type"]
default_cal_setting = {
"relax_pos": False,
"relax_shape": False,
"relax_vol": False,
}
if "cal_setting" not in parameter:
parameter["cal_setting"] = default_cal_setting
else:
if "relax_pos" not in parameter["cal_setting"]:
parameter["cal_setting"]["relax_pos"] = default_cal_setting[
"relax_pos"
]
if "relax_shape" not in parameter["cal_setting"]:
parameter["cal_setting"]["relax_shape"] = default_cal_setting[
"relax_shape"
]
if "relax_vol" not in parameter["cal_setting"]:
parameter["cal_setting"]["relax_vol"] = default_cal_setting[
"relax_vol"
]
self.cal_setting = parameter["cal_setting"]
parameter["init_from_suffix"] = parameter.get("init_from_suffix", "00")
self.init_from_suffix = parameter["init_from_suffix"]
self.parameter = parameter
self.inter_param = inter_param if inter_param is not None else {"type": "vasp"}
[docs]
def make_confs(self, path_to_work, path_to_equi, refine=False):
from pymatgen.core.structure import Structure
path_to_work = os.path.abspath(path_to_work)
if os.path.exists(path_to_work):
dlog.warning(f"{path_to_work} already exists")
else:
os.makedirs(path_to_work)
path_to_equi = os.path.abspath(path_to_equi)
if "start_confs_path" in self.parameter and os.path.exists(
self.parameter["start_confs_path"]
):
init_path_list = glob.glob(
os.path.join(self.parameter["start_confs_path"], "*")
)
struct_init_name_list = []
for ii in init_path_list:
struct_init_name_list.append(ii.split("/")[-1])
struct_output_name = path_to_work.split("/")[-2]
assert struct_output_name in struct_init_name_list
path_to_equi = os.path.abspath(
os.path.join(
self.parameter["start_confs_path"],
struct_output_name,
"relaxation",
"relax_task",
)
)
task_list = []
cwd = os.getcwd()
if self.reprod:
print("gamma line reproduce starts")
if "init_data_path" not in self.parameter:
raise RuntimeError("please provide the initial data path to reproduce")
init_data_path = os.path.abspath(self.parameter["init_data_path"])
task_list = make_repro(
init_data_path,
self.init_from_suffix,
path_to_work,
self.parameter.get("reprod_last_frame", True),
)
os.chdir(cwd)
else:
if refine:
print("gamma line refine starts")
task_list = make_refine(
self.parameter["init_from_suffix"],
self.parameter["output_suffix"],
path_to_work,
)
os.chdir(cwd)
# record miller
init_from_path = re.sub(
self.parameter["output_suffix"][::-1],
self.parameter["init_from_suffix"][::-1],
path_to_work[::-1],
count=1,
)[::-1]
task_list_basename = list(map(os.path.basename, task_list))
for ii in task_list_basename:
init_from_task = os.path.join(init_from_path, ii)
output_task = os.path.join(path_to_work, ii)
os.chdir(output_task)
if os.path.isfile("miller.json"):
os.remove("miller.json")
if os.path.islink("miller.json"):
os.remove("miller.json")
os.symlink(
os.path.relpath(os.path.join(init_from_task, "miller.json")),
"miller.json",
)
os.chdir(cwd)
else:
if self.inter_param["type"] == "abacus":
CONTCAR = abacus.final_stru(path_to_equi)
POSCAR = "STRU"
else:
CONTCAR = "CONTCAR"
POSCAR = "POSCAR"
equi_contcar = os.path.join(path_to_equi, CONTCAR)
if not os.path.exists(equi_contcar):
raise RuntimeError("please do relaxation first")
print(
"we now only support gamma line calculation for BCC and FCC metals"
)
print(
"supported slip systems are planes/direction: 100/010, 110/111, 111/110, 111/112, 112/111, and 123/111"
)
if self.inter_param["type"] == "abacus":
stru = dpdata.System(equi_contcar, fmt="stru")
stru.to("contcar", "CONTCAR.tmp")
ptypes = vasp.get_poscar_types("CONTCAR.tmp")
ss = Structure.from_file("CONTCAR.tmp")
os.remove("CONTCAR.tmp")
else:
ptypes = vasp.get_poscar_types(equi_contcar)
# read structure from relaxed CONTCAR
ss = Structure.from_file(equi_contcar)
# rewrite new CONTCAR with direct coords
os.chdir(path_to_equi)
ss.to("CONTCAR.direct", "POSCAR")
# re-read new CONTCAR
ss = Structure.from_file("CONTCAR.direct")
relax_a = ss.lattice.a
relax_b = ss.lattice.b
relax_c = ss.lattice.c
# gen initial slab
slab = self.__gen_slab_ase(
symbol=ptypes[0], lat_param=[relax_a, relax_b, relax_c]
)
# define displace vectors
disp_vector = (1 / self.supercell_size[0], 0, 0)
# displace structure
all_slabs = self.__displace_slab(slab, disp_vector=disp_vector)
self.atom_num = len(all_slabs[0].sites)
os.chdir(path_to_work)
if os.path.isfile(POSCAR):
os.remove(POSCAR)
if os.path.islink(POSCAR):
os.remove(POSCAR)
os.symlink(os.path.relpath(equi_contcar), POSCAR)
# task_poscar = os.path.join(output, 'POSCAR')
for ii in range(len(all_slabs)):
output_task = os.path.join(path_to_work, "task.%06d" % ii)
os.makedirs(output_task, exist_ok=True)
os.chdir(output_task)
for jj in ["INCAR", "POTCAR", POSCAR, "conf.lmp", "in.lammps"]:
if os.path.exists(jj):
os.remove(jj)
task_list.append(output_task)
# print("# %03d generate " % ii, output_task)
print(
"# %03d generate " % ii,
output_task,
" \t %d atoms" % self.atom_num,
)
# make confs
all_slabs[ii].to("POSCAR.tmp", "POSCAR")
vasp.regulate_poscar("POSCAR.tmp", "POSCAR")
vasp.sort_poscar("POSCAR", "POSCAR", ptypes)
if self.inter_param["type"] == "abacus":
abacus.poscar2stru("POSCAR", self.inter_param, "STRU")
os.remove("POSCAR")
# vasp.perturb_xz('POSCAR', 'POSCAR', self.pert_xz)
# record miller
dumpfn(self.miller_index, "miller.json")
os.chdir(cwd)
return task_list
[docs]
@staticmethod
def centralize_slab(slab) -> None:
z_pos_list = list(set([site.position[2] for site in slab]))
z_pos_list.sort()
central_atoms = (z_pos_list[-1] - z_pos_list[0]) / 2
central_cell = slab.cell[2][2] / 2
disp_length = central_cell - central_atoms
for site in slab:
site.position[2] += disp_length
[docs]
def return_direction(self):
miller_str = ""
direct_str = ""
for ii in range(len(self.miller_index)):
miller_str += str(self.miller_index[ii])
for ii in range(len(self.displace_direction)):
direct_str += str(self.displace_direction[ii])
search_key = miller_str + "/" + direct_str
# define specific cell vectors
dict_directions = {
"100/010": [(0, 1, 0), (0, 0, 1), (1, 0, 0)],
"110/111": [(-1, 1, 1), (1, -1, 1), (1, 1, 0)],
"111/110": [(-1, 1, 0), (-1, -1, 2), (1, 1, 1)],
"111/112": [(1, 1, -2), (-1, 1, 0), (1, 1, 1)],
"112/111": [(-1, -1, 1), (1, -1, 0), (1, 1, 2)],
"123/111": [(-1, -1, 1), (2, -1, 0), (1, 2, 3)],
}
try:
directions = dict_directions[search_key]
except KeyError:
raise RuntimeError(
f"Unsupported input combination of miller index and displacement direction: "
f"{miller_str}:{direct_str}"
)
return directions
def __gen_slab_ase(self, symbol, lat_param):
from pymatgen.io.ase import AseAtomsAdaptor
if not self.lattice_type:
raise RuntimeError("Error! Please provide the input lattice type!")
elif self.lattice_type == "bcc":
slab_ase = bcc(
symbol=symbol,
size=self.supercell_size,
latticeconstant=lat_param[0],
directions=self.return_direction(),
)
elif self.lattice_type == "fcc":
slab_ase = fcc(
symbol=symbol,
size=self.supercell_size,
latticeconstant=lat_param[0],
directions=self.return_direction(),
)
elif self.lattice_type == "hcp":
pass
else:
raise RuntimeError(f"unsupported lattice type: {self.lattice_type}")
self.centralize_slab(slab_ase)
if self.min_vacuum_size > 0:
slab_ase.center(vacuum=self.min_vacuum_size / 2, axis=2)
slab_pymatgen = AseAtomsAdaptor.get_structure(slab_ase)
return slab_pymatgen
# leave this function to later use
# def __gen_slab_pmg(self,
# pmg_struc):
# slabGen = SlabGenerator(pmg_struc, miller_index=self.miller_index,
# min_slab_size=self.supercell_size[2],
# min_vacuum_size=self.min_vacuum_size,
# center_slab=True, in_unit_planes=True, lll_reduce=False,
# primitive=True, max_normal_search=5)
# slab_pmg = slabGen.get_slab()
# slab_pmg.make_supercell(scaling_matrix=[self.supercell_size[0],self.supercell_size[1],1])
# return slab_pmg
def __displace_slab(self, slab, disp_vector):
# return a list of displaced slab objects
all_slabs = [slab.copy()]
for ii in list(range(self.n_steps)):
frac_disp = 1 / self.n_steps
unit_vector = frac_disp * np.array(disp_vector)
# return list of atoms number to be displaced which above 0.5 z
disp_atoms_list = np.where(slab.frac_coords[:, 2] > 0.5)[0]
slab.translate_sites(
indices=disp_atoms_list,
vector=unit_vector,
frac_coords=True,
to_unit_cell=True,
)
all_slabs.append(slab.copy())
return all_slabs
def __poscar_fix(self, poscar) -> None:
# add position fix condition of x and y in POSCAR
insert_pos = -self.atom_num
fix_dict = {"true": "F", "false": "T"}
add_fix_str = (
" "
+ fix_dict[self.add_fix[0]]
+ " "
+ fix_dict[self.add_fix[1]]
+ " "
+ fix_dict[self.add_fix[2]]
+ "\n"
)
with open(poscar) as fin1:
contents = fin1.readlines()
contents.insert(insert_pos - 1, "Selective dynamics\n")
for ii in range(insert_pos, 0, 1):
contents[ii] = contents[ii].replace("\n", "")
contents[ii] += add_fix_str
with open(poscar, "w") as fin2:
for ii in range(len(contents)):
fin2.write(contents[ii])
def __stru_fix(self, stru) -> None:
fix_dict = {"true": True, "false": False}
fix_xyz = [fix_dict[i] for i in self.addfix]
abacus.stru_fix_atom(stru, fix_atom=fix_xyz)
def __inLammpes_fix(self, inLammps) -> None:
# add position fix condition of x and y of in.lammps
fix_dict = {"true": "0", "false": "NULL"}
add_fix_str = (
"fix 1 all setforce"
+ " "
+ fix_dict[self.add_fix[0]]
+ " "
+ fix_dict[self.add_fix[1]]
+ " "
+ fix_dict[self.add_fix[2]]
+ "\n"
)
with open(inLammps) as fin1:
contents = fin1.readlines()
for ii in range(len(contents)):
upper = re.search(r"variable N equal count\(all\)", contents[ii])
lower = re.search("min_style cg", contents[ii])
if lower:
lower_id = ii
# print(lower_id)
elif upper:
upper_id = ii
# print(upper_id)
del contents[lower_id + 1 : upper_id - 1]
contents.insert(lower_id + 1, add_fix_str)
with open(inLammps, "w") as fin2:
for ii in range(len(contents)):
fin2.write(contents[ii])
[docs]
def post_process(self, task_list):
if self.add_fix:
count = 0
for ii in task_list:
count += 1
inter = os.path.join(ii, "inter.json")
poscar = os.path.join(ii, "POSCAR")
calc_type = loadfn(inter)["type"]
if calc_type == "vasp":
self.__poscar_fix(poscar)
elif calc_type == "abacus":
self.__stru_fix(os.path.join(ii, "STRU"))
else:
inLammps = os.path.join(ii, "in.lammps")
if count == 1:
self.__inLammpes_fix(inLammps)
[docs]
def task_type(self):
return self.parameter["type"]
[docs]
def task_param(self):
return self.parameter
def _compute_lower(self, output_file, all_tasks, all_res):
output_file = os.path.abspath(output_file)
res_data = {}
ptr_data = os.path.dirname(output_file) + "\n"
if not self.reprod:
ptr_data += (
str(tuple(self.miller_index))
+ " plane along "
+ str(self.displace_direction)
)
ptr_data += "No_task: \tDisplacement \tStacking_Fault_E(J/m^2) EpA(eV) slab_equi_EpA(eV)\n"
all_tasks.sort()
task_result_slab_equi = loadfn(
os.path.join(all_tasks[0], "result_task.json")
)
for ii in all_tasks:
task_result = loadfn(os.path.join(ii, "result_task.json"))
natoms = np.sum(task_result["atom_numbs"])
epa = task_result["energies"][-1] / natoms
equi_epa_slab = task_result_slab_equi["energies"][-1] / natoms
AA = np.linalg.norm(
np.cross(task_result["cells"][0][0], task_result["cells"][0][1])
)
equi_path = os.path.abspath(
os.path.join(
os.path.dirname(output_file), "../relaxation/relax_task"
)
)
equi_result = loadfn(os.path.join(equi_path, "result.json"))
equi_epa = equi_result["energies"][-1] / np.sum(
equi_result["atom_numbs"]
)
structure_dir = os.path.basename(ii)
Cf = 1.60217657e-16 / 1e-20 * 0.001
sfe = (
(
task_result["energies"][-1]
- task_result_slab_equi["energies"][-1]
)
/ AA
* Cf
)
miller_index = loadfn(os.path.join(ii, "miller.json"))
ptr_data += "%-25s %7.2f %7.3f %8.3f %8.3f\n" % (
str(miller_index) + "-" + structure_dir + ":",
int(ii[-4:]) / self.n_steps,
sfe,
epa,
equi_epa_slab,
)
res_data[int(ii[-4:]) / self.n_steps] = [sfe, epa, equi_epa]
else:
if "init_data_path" not in self.parameter:
raise RuntimeError("please provide the initial data path to reproduce")
init_data_path = os.path.abspath(self.parameter["init_data_path"])
res_data, ptr_data = post_repro(
init_data_path,
self.parameter["init_from_suffix"],
all_tasks,
ptr_data,
self.parameter.get("reprod_last_frame", True),
)
with open(output_file, "w") as fp:
json.dump(res_data, fp, indent=4)
return res_data, ptr_data