import glob
import os
import shutil
from monty.serialization import dumpfn
from packaging.version import Version
import dpgen.auto_test.lib.abacus as abacus
import dpgen.auto_test.lib.crys as crys
import dpgen.auto_test.lib.util as util
from dpgen import dlog
from dpgen.auto_test.calculator import make_calculator
from dpgen.auto_test.lib.utils import create_path
from dpgen.auto_test.mpdb import get_structure
from dpgen.dispatcher.Dispatcher import make_submission
from dpgen.remote.decide_machine import convert_mdata
lammps_task_type = ["deepmd", "meam", "eam_fs", "eam_alloy"]
[docs]
def make_equi(confs, inter_param, relax_param):
# find all POSCARs and their name like mp-xxx
# ...
dlog.debug("debug info make equi")
if "type_map" in inter_param:
ele_list = [key for key in inter_param["type_map"].keys()]
else:
ele_list = [key for key in inter_param["potcars"].keys()]
# ele_list = inter_param['type_map']
dlog.debug("ele_list {}".format(":".join(ele_list)))
conf_dirs = []
for conf in confs:
conf_dirs.extend(glob.glob(conf))
conf_dirs.sort()
# generate a list of task names like mp-xxx/relaxation/relax_task
# ...
cwd = os.getcwd()
# generate poscar for single element crystal
if len(ele_list) == 1 or "single" in inter_param:
if "single" in inter_param:
element_label = int(inter_param["single"])
else:
element_label = 0
for ii in conf_dirs:
os.chdir(ii)
crys_type = ii.split("/")[-1]
dlog.debug(f"crys_type: {crys_type}")
dlog.debug(f"pwd: {os.getcwd()}")
if crys_type == "std-fcc":
if not os.path.exists("POSCAR"):
crys.fcc1(ele_list[element_label]).to("POSCAR", "POSCAR")
elif crys_type == "std-hcp":
if not os.path.exists("POSCAR"):
crys.hcp(ele_list[element_label]).to("POSCAR", "POSCAR")
elif crys_type == "std-dhcp":
if not os.path.exists("POSCAR"):
crys.dhcp(ele_list[element_label]).to("POSCAR", "POSCAR")
elif crys_type == "std-bcc":
if not os.path.exists("POSCAR"):
crys.bcc(ele_list[element_label]).to("POSCAR", "POSCAR")
elif crys_type == "std-diamond":
if not os.path.exists("POSCAR"):
crys.diamond(ele_list[element_label]).to("POSCAR", "POSCAR")
elif crys_type == "std-sc":
if not os.path.exists("POSCAR"):
crys.sc(ele_list[element_label]).to("POSCAR", "POSCAR")
if inter_param["type"] == "abacus" and not os.path.exists("STRU"):
abacus.poscar2stru("POSCAR", inter_param, "STRU")
os.remove("POSCAR")
os.chdir(cwd)
task_dirs = []
# make task directories like mp-xxx/relaxation/relax_task
# if mp-xxx/exists then print a warning and exit.
# ...
for ii in conf_dirs:
crys_type = ii.split("/")[-1]
dlog.debug(f"crys_type: {crys_type}")
if "mp-" in crys_type and not os.path.exists(os.path.join(ii, "POSCAR")):
get_structure(crys_type).to("POSCAR", os.path.join(ii, "POSCAR"))
if inter_param["type"] == "abacus" and not os.path.exists("STRU"):
abacus.poscar2stru(
os.path.join(ii, "POSCAR"), inter_param, os.path.join(ii, "STRU")
)
os.remove(os.path.join(ii, "POSCAR"))
poscar = os.path.abspath(os.path.join(ii, "POSCAR"))
POSCAR = "POSCAR"
if inter_param["type"] == "abacus":
shutil.copyfile(os.path.join(ii, "STRU"), os.path.join(ii, "STRU.bk"))
abacus.modify_stru_path(os.path.join(ii, "STRU"), "pp_orb/")
poscar = os.path.abspath(os.path.join(ii, "STRU"))
POSCAR = "STRU"
if not os.path.exists(poscar):
raise FileNotFoundError("no configuration for autotest")
if os.path.exists(os.path.join(ii, "relaxation", "jr.json")):
os.remove(os.path.join(ii, "relaxation", "jr.json"))
relax_dirs = os.path.abspath(
os.path.join(ii, "relaxation", "relax_task")
) # to be consistent with property in make dispatcher
create_path(relax_dirs)
task_dirs.append(relax_dirs)
os.chdir(relax_dirs)
# copy POSCARs to mp-xxx/relaxation/relax_task
# ...
if os.path.isfile(POSCAR):
os.remove(POSCAR)
os.symlink(os.path.relpath(poscar), POSCAR)
os.chdir(cwd)
task_dirs.sort()
# generate task files
relax_param["cal_type"] = "relaxation"
if "cal_setting" not in relax_param:
relax_param["cal_setting"] = {
"relax_pos": True,
"relax_shape": True,
"relax_vol": True,
}
else:
if "relax_pos" not in relax_param["cal_setting"]:
relax_param["cal_setting"]["relax_pos"] = True
if "relax_shape" not in relax_param["cal_setting"]:
relax_param["cal_setting"]["relax_shape"] = True
if "relax_vol" not in relax_param["cal_setting"]:
relax_param["cal_setting"]["relax_vol"] = True
for ii in task_dirs:
poscar = os.path.join(ii, "POSCAR")
dlog.debug(f"task_dir {ii}")
inter = make_calculator(inter_param, poscar)
inter.make_potential_files(ii)
inter.make_input_file(ii, "relaxation", relax_param)
[docs]
def run_equi(confs, inter_param, mdata):
# find all POSCARs and their name like mp-xxx
# ...
conf_dirs = []
for conf in confs:
conf_dirs.extend(glob.glob(conf))
conf_dirs.sort()
processes = len(conf_dirs)
# generate a list of task names like mp-xxx/relaxation/relax_task
# ...
work_path_list = []
for ii in conf_dirs:
work_path_list.append(os.path.join(ii, "relaxation"))
all_task = []
for ii in work_path_list:
all_task.append(os.path.join(ii, "relax_task"))
run_tasks = all_task
inter_type = inter_param["type"]
# vasp
if inter_type in ["vasp", "abacus"]:
mdata = convert_mdata(mdata, ["fp"])
elif inter_type in lammps_task_type:
mdata = convert_mdata(mdata, ["model_devi"])
else:
raise RuntimeError(f"unknown task {inter_type}, something wrong")
# dispatch the tasks
# POSCAR here is useless
virtual_calculator = make_calculator(inter_param, "POSCAR")
forward_files = virtual_calculator.forward_files()
forward_common_files = virtual_calculator.forward_common_files()
backward_files = virtual_calculator.backward_files()
# backward_files += logs
machine, resources, command, group_size = util.get_machine_info(mdata, inter_type)
work_path = os.getcwd()
print(f"{work_path} --> Runing... ")
api_version = mdata.get("api_version", "1.0")
if Version(api_version) < Version("1.0"):
raise RuntimeError(
f"API version {api_version} has been removed. Please upgrade to 1.0."
)
elif Version(api_version) >= Version("1.0"):
submission = make_submission(
mdata_machine=machine,
mdata_resources=resources,
commands=[command],
work_path=work_path,
run_tasks=run_tasks,
group_size=group_size,
forward_common_files=forward_common_files,
forward_files=forward_files,
backward_files=backward_files,
outlog="outlog",
errlog="errlog",
)
submission.run_submission()
[docs]
def post_equi(confs, inter_param):
# find all POSCARs and their name like mp-xxx
# ...
conf_dirs = []
for conf in confs:
conf_dirs.extend(glob.glob(conf))
conf_dirs.sort()
task_dirs = []
for ii in conf_dirs:
task_dirs.append(os.path.abspath(os.path.join(ii, "relaxation", "relax_task")))
task_dirs.sort()
# generate a list of task names like mp-xxx/relaxation
# ...
# dump the relaxation result.
for ii in task_dirs:
poscar = os.path.join(ii, "POSCAR")
inter = make_calculator(inter_param, poscar)
res = inter.compute(ii)
dumpfn(res, os.path.join(ii, "result.json"), indent=4)