import glob
import os
import numpy as np
from monty.serialization import loadfn
import dpgen.auto_test.lib.abacus as abacus
[docs]
def make_repro(
inter_param, init_data_path, init_from_suffix, path_to_work, reprod_last_frame=True
):
path_to_work = os.path.abspath(path_to_work)
property_type = path_to_work.split("/")[-1].split("_")[0]
init_data_path = os.path.join(
init_data_path, "*", property_type + "_" + init_from_suffix
)
init_data_path_list = glob.glob(init_data_path)
init_data_path_list.sort()
cwd = os.getcwd()
struct_init_name_list = []
for ii in init_data_path_list:
struct_init_name_list.append(ii.split("/")[-2])
struct_output_name = path_to_work.split("/")[-2]
assert struct_output_name in struct_init_name_list
for idx, ii in enumerate(struct_init_name_list):
if ii == struct_output_name:
label = idx
init_data_path_todo = init_data_path_list[label]
init_data_task_todo = glob.glob(
os.path.join(init_data_path_todo, "task.[0-9]*[0-9]")
)
assert (
len(init_data_task_todo) > 0
), "There is no task in previous calculations path"
init_data_task_todo.sort()
task_list = []
task_num = 0
if property_type == "interstitial":
if os.path.exists(os.path.join(path_to_work, "element.out")):
os.remove(os.path.join(path_to_work, "element.out"))
fout_element = open(os.path.join(path_to_work, "element.out"), "a+")
fin_element = open(os.path.join(init_data_path_todo, "element.out"))
for ii in init_data_task_todo:
# get frame number
task_result = loadfn(os.path.join(ii, "result_task.json"))
if reprod_last_frame:
nframe = 1
else:
nframe = len(task_result["energies"])
if property_type == "interstitial":
insert_element = fin_element.readline().split()[0]
for jj in range(nframe):
if property_type == "interstitial":
print(insert_element, file=fout_element)
output_task = os.path.join(path_to_work, "task.%06d" % task_num)
task_num += 1
task_list.append(output_task)
os.makedirs(output_task, exist_ok=True)
os.chdir(output_task)
# clear dir
for kk in [
"INCAR",
"POTCAR",
"POSCAR.orig",
"POSCAR",
"conf.lmp",
"in.lammps",
"STRU",
]:
if os.path.exists(kk):
os.remove(kk)
# make conf
if reprod_last_frame:
task_result.to("vasp/poscar", "POSCAR", frame_idx=-1)
else:
task_result.to("vasp/poscar", "POSCAR", frame_idx=jj)
if inter_param["type"] == "abacus":
abacus.poscar2stru("POSCAR", inter_param, "STRU")
os.remove("POSCAR")
os.chdir(cwd)
if property_type == "interstitial":
fout_element.close()
fin_element.close()
return task_list
[docs]
def post_repro(
init_data_path, init_from_suffix, all_tasks, ptr_data, reprod_last_frame=True
):
ptr_data += "Reproduce: Initial_path Init_E(eV/atom) Reprod_E(eV/atom) Difference(eV/atom)\n"
struct_output_name = all_tasks[0].split("/")[-3]
property_type = all_tasks[0].split("/")[-2].split("_")[0]
init_data_path = os.path.join(
init_data_path, "*", property_type + "_" + init_from_suffix
)
init_data_path_list = glob.glob(init_data_path)
init_data_path_list.sort()
# cwd = os.getcwd()
struct_init_name_list = []
for ii in init_data_path_list:
struct_init_name_list.append(ii.split("/")[-2])
assert struct_output_name in struct_init_name_list
for idx, ii in enumerate(struct_init_name_list):
if ii == struct_output_name:
label = idx
init_data_path_todo = init_data_path_list[label]
init_data_task_todo = glob.glob(
os.path.join(init_data_path_todo, "task.[0-9]*[0-9]")
)
assert (
len(init_data_task_todo) > 0
), "There is no task in previous calculations path"
init_data_task_todo.sort()
idid = 0
init_ener_tot = []
output_ener_tot = []
res_data = {}
for ii in init_data_task_todo:
init_task_result = loadfn(os.path.join(ii, "result_task.json"))
if reprod_last_frame:
nframe = 1
else:
nframe = len(init_task_result["energies"])
# idid += nframe
natoms = np.sum(init_task_result["atom_numbs"])
if reprod_last_frame:
init_ener = init_task_result["energies"][-1:]
else:
init_ener = init_task_result["energies"]
init_ener_tot.extend(list(init_ener))
output_ener = []
for jj in range(idid, idid + nframe):
output_task_result = loadfn(os.path.join(all_tasks[jj], "result_task.json"))
output_epa = output_task_result["energies"] / natoms
output_ener.append(output_epa)
output_ener_tot.extend(output_task_result["energies"])
init_epa = init_ener[jj - idid] / natoms
ptr_data += f"{ii} {init_epa:7.3f} {output_epa:7.3f} {output_epa - init_epa:7.3f}\n"
idid += nframe
output_ener = np.array(output_ener)
output_ener = np.reshape(output_ener, [-1, 1])
init_ener = np.reshape(init_ener, [-1, 1]) / natoms
if reprod_last_frame:
error_start = 0
else:
error_start = 1
output_ener -= output_ener[-1] - init_ener[-1]
diff = output_ener - init_ener
diff = diff[error_start:]
error = np.linalg.norm(diff) / np.sqrt(np.size(output_ener) - error_start)
res_data[ii] = {"nframes": len(init_ener), "error": error}
if not len(init_ener_tot) == len(output_ener_tot):
raise RuntimeError("reproduce tasks not equal to init")
# for ii in range(len(lmp_ener_tot)):
# ptr_data += '%7.3f %7.3f %7.3f\n' % (vasp_ener_tot[ii], lmp_ener_tot[ii],
# lmp_ener_tot[ii] - vasp_ener_tot[ii])
return res_data, ptr_data