Source code for dpgen.auto_test.Surface

import glob
import json
import os
import re

import dpdata
import numpy as np
from monty.serialization import dumpfn, loadfn
from pymatgen.core.structure import Structure
from pymatgen.core.surface import generate_all_slabs

import dpgen.auto_test.lib.abacus as abacus
import dpgen.auto_test.lib.vasp as vasp
from dpgen import dlog
from dpgen.auto_test.Property import Property
from dpgen.auto_test.refine import make_refine
from dpgen.auto_test.reproduce import make_repro, post_repro


[docs] class Surface(Property): def __init__(self, parameter, inter_param=None): parameter["reproduce"] = parameter.get("reproduce", False) self.reprod = parameter["reproduce"] if not self.reprod: if not ("init_from_suffix" in parameter and "output_suffix" in parameter): self.min_slab_size = parameter["min_slab_size"] self.min_vacuum_size = parameter["min_vacuum_size"] parameter["pert_xz"] = parameter.get("pert_xz", 0.01) self.pert_xz = parameter["pert_xz"] default_max_miller = 2 parameter["max_miller"] = parameter.get( "max_miller", default_max_miller ) self.miller = parameter["max_miller"] parameter["cal_type"] = parameter.get("cal_type", "relaxation") self.cal_type = parameter["cal_type"] default_cal_setting = { "relax_pos": True, "relax_shape": True, "relax_vol": False, } if "cal_setting" not in parameter: parameter["cal_setting"] = default_cal_setting else: if "relax_pos" not in parameter["cal_setting"]: parameter["cal_setting"]["relax_pos"] = default_cal_setting[ "relax_pos" ] if "relax_shape" not in parameter["cal_setting"]: parameter["cal_setting"]["relax_shape"] = default_cal_setting[ "relax_shape" ] if "relax_vol" not in parameter["cal_setting"]: parameter["cal_setting"]["relax_vol"] = default_cal_setting[ "relax_vol" ] self.cal_setting = parameter["cal_setting"] else: parameter["cal_type"] = "static" self.cal_type = parameter["cal_type"] default_cal_setting = { "relax_pos": False, "relax_shape": False, "relax_vol": False, } if "cal_setting" not in parameter: parameter["cal_setting"] = default_cal_setting else: if "relax_pos" not in parameter["cal_setting"]: parameter["cal_setting"]["relax_pos"] = default_cal_setting[ "relax_pos" ] if "relax_shape" not in parameter["cal_setting"]: parameter["cal_setting"]["relax_shape"] = default_cal_setting[ "relax_shape" ] if "relax_vol" not in parameter["cal_setting"]: parameter["cal_setting"]["relax_vol"] = default_cal_setting[ "relax_vol" ] self.cal_setting = parameter["cal_setting"] parameter["init_from_suffix"] = parameter.get("init_from_suffix", "00") self.init_from_suffix = parameter["init_from_suffix"] self.parameter = parameter self.inter_param = inter_param if inter_param is not None else {"type": "vasp"}
[docs] def make_confs(self, path_to_work, path_to_equi, refine=False): path_to_work = os.path.abspath(path_to_work) if os.path.exists(path_to_work): dlog.warning("%s already exists" % path_to_work) else: os.makedirs(path_to_work) path_to_equi = os.path.abspath(path_to_equi) if "start_confs_path" in self.parameter and os.path.exists( self.parameter["start_confs_path"] ): init_path_list = glob.glob( os.path.join(self.parameter["start_confs_path"], "*") ) struct_init_name_list = [] for ii in init_path_list: struct_init_name_list.append(ii.split("/")[-1]) struct_output_name = path_to_work.split("/")[-2] assert struct_output_name in struct_init_name_list path_to_equi = os.path.abspath( os.path.join( self.parameter["start_confs_path"], struct_output_name, "relaxation", "relax_task", ) ) task_list = [] cwd = os.getcwd() if self.reprod: print("surface reproduce starts") if "init_data_path" not in self.parameter: raise RuntimeError("please provide the initial data path to reproduce") init_data_path = os.path.abspath(self.parameter["init_data_path"]) task_list = make_repro( self.inter_param, init_data_path, self.init_from_suffix, path_to_work, self.parameter.get("reprod_last_frame", True), ) os.chdir(cwd) else: if refine: print("surface refine starts") task_list = make_refine( self.parameter["init_from_suffix"], self.parameter["output_suffix"], path_to_work, ) os.chdir(cwd) # record miller init_from_path = re.sub( self.parameter["output_suffix"][::-1], self.parameter["init_from_suffix"][::-1], path_to_work[::-1], count=1, )[::-1] task_list_basename = list(map(os.path.basename, task_list)) for ii in task_list_basename: init_from_task = os.path.join(init_from_path, ii) output_task = os.path.join(path_to_work, ii) os.chdir(output_task) if os.path.isfile("miller.json"): os.remove("miller.json") if os.path.islink("miller.json"): os.remove("miller.json") os.symlink( os.path.relpath(os.path.join(init_from_task, "miller.json")), "miller.json", ) os.chdir(cwd) else: if self.inter_param["type"] == "abacus": CONTCAR = abacus.final_stru(path_to_equi) POSCAR = "STRU" else: CONTCAR = "CONTCAR" POSCAR = "POSCAR" equi_contcar = os.path.join(path_to_equi, CONTCAR) if not os.path.exists(equi_contcar): raise RuntimeError("please do relaxation first") if self.inter_param["type"] == "abacus": stru = dpdata.System(equi_contcar, fmt="stru") stru.to("contcar", "CONTCAR.tmp") ptypes = vasp.get_poscar_types("CONTCAR.tmp") ss = Structure.from_file("CONTCAR.tmp") os.remove("CONTCAR.tmp") else: ptypes = vasp.get_poscar_types(equi_contcar) # gen structure ss = Structure.from_file(equi_contcar) # gen slabs all_slabs = generate_all_slabs( ss, self.miller, self.min_slab_size, self.min_vacuum_size ) os.chdir(path_to_work) if os.path.isfile(POSCAR): os.remove(POSCAR) if os.path.islink(POSCAR): os.remove(POSCAR) os.symlink(os.path.relpath(equi_contcar), POSCAR) # task_poscar = os.path.join(output, 'POSCAR') for ii in range(len(all_slabs)): output_task = os.path.join(path_to_work, "task.%06d" % ii) os.makedirs(output_task, exist_ok=True) os.chdir(output_task) for jj in [ "INCAR", "POTCAR", "POSCAR", "conf.lmp", "in.lammps", "STRU", ]: if os.path.exists(jj): os.remove(jj) task_list.append(output_task) print( "# %03d generate " % ii, output_task, " \t %d atoms" % len(all_slabs[ii].sites), ) # make confs all_slabs[ii].to("POSCAR.tmp", "POSCAR") vasp.regulate_poscar("POSCAR.tmp", "POSCAR") vasp.sort_poscar("POSCAR", "POSCAR", ptypes) vasp.perturb_xz("POSCAR", "POSCAR", self.pert_xz) if self.inter_param["type"] == "abacus": abacus.poscar2stru("POSCAR", self.inter_param, "STRU") os.remove("POSCAR") # record miller dumpfn(all_slabs[ii].miller_index, "miller.json") os.chdir(cwd) return task_list
[docs] def post_process(self, task_list): pass
[docs] def task_type(self): return self.parameter["type"]
[docs] def task_param(self): return self.parameter
def _compute_lower(self, output_file, all_tasks, all_res): output_file = os.path.abspath(output_file) res_data = {} ptr_data = os.path.dirname(output_file) + "\n" if not self.reprod: ptr_data += "Miller_Indices: \tSurf_E(J/m^2) EpA(eV) equi_EpA(eV)\n" for ii in all_tasks: task_result = loadfn(os.path.join(ii, "result_task.json")) natoms = np.sum(task_result["atom_numbs"]) epa = task_result["energies"][-1] / natoms AA = np.linalg.norm( np.cross(task_result["cells"][0][0], task_result["cells"][0][1]) ) equi_path = os.path.abspath( os.path.join( os.path.dirname(output_file), "../relaxation/relax_task" ) ) equi_result = loadfn(os.path.join(equi_path, "result.json")) equi_epa = equi_result["energies"][-1] / np.sum( equi_result["atom_numbs"] ) structure_dir = os.path.basename(ii) Cf = 1.60217657e-16 / (1e-20 * 2) * 0.001 evac = (task_result["energies"][-1] - equi_epa * natoms) / AA * Cf miller_index = loadfn(os.path.join(ii, "miller.json")) ptr_data += "%-25s %7.3f %8.3f %8.3f\n" % ( str(miller_index) + "-" + structure_dir + ":", evac, epa, equi_epa, ) res_data[str(miller_index) + "-" + structure_dir] = [ evac, epa, equi_epa, ] else: if "init_data_path" not in self.parameter: raise RuntimeError("please provide the initial data path to reproduce") init_data_path = os.path.abspath(self.parameter["init_data_path"]) res_data, ptr_data = post_repro( init_data_path, self.parameter["init_from_suffix"], all_tasks, ptr_data, self.parameter.get("reprod_last_frame", True), ) with open(output_file, "w") as fp: json.dump(res_data, fp, indent=4) return res_data, ptr_data