Source code for dpgen.auto_test.lib.util
import os
import re
import numpy as np
import requests
from dpgen.auto_test.lib import abacus, lammps, vasp
from dpgen.auto_test.lib.utils import cmd_append_log
lammps_task_type = ["deepmd", "meam", "eam_fs", "eam_alloy"] # 06/13 revised
[docs]
def voigt_to_stress(inpt):
ret = np.zeros((3, 3))
ret[0][0] = inpt[0]
ret[1][1] = inpt[1]
ret[2][2] = inpt[2]
ret[0][1] = inpt[3]
ret[0][2] = inpt[4]
ret[1][2] = inpt[5]
ret[2][0] = ret[0][2]
ret[1][0] = ret[0][1]
ret[2][1] = ret[1][2]
return ret
[docs]
def insert_data(task, task_type, username, file_name):
assert task in ["eos", "elastic", "surf"]
assert task_type in ["vasp", "deepmd"]
url = f"http://115.27.161.2:5000/insert_test_data?username={username}&expr_type={task_type}&data_type={task}"
res = requests.post(url, data=open(file_name).read())
print("Successful upload!")
[docs]
def make_work_path(jdata, task, reprod_opt, static, user):
task_type = jdata["task_type"]
conf_dir = jdata["conf_dir"]
conf_path = os.path.abspath(conf_dir)
task_path = re.sub("confs", task, conf_path)
if task_type == "vasp":
if user:
work_path = os.path.join(task_path, "vasp-user_incar")
assert os.path.isdir(work_path)
return work_path
if static:
if "scf_incar" in jdata.keys():
task_type = task_type + "-static-scf_incar"
else:
kspacing = jdata["vasp_params"]["kspacing"]
task_type = task_type + f"-static-k{kspacing:.2f}"
else:
if "relax_incar" in jdata.keys():
task_type = task_type + "-relax_incar"
else:
kspacing = jdata["vasp_params"]["kspacing"]
task_type = task_type + f"-k{kspacing:.2f}"
elif task_type in lammps_task_type:
if static:
task_type = task_type + "-static"
elif reprod_opt:
if "relax_incar" in jdata.keys():
task_type = task_type + "-reprod-relax_incar"
else:
kspacing = jdata["vasp_params"]["kspacing"]
task_type = task_type + f"-reprod-k{kspacing:.2f}"
work_path = os.path.join(task_path, task_type)
assert os.path.isdir(work_path)
return work_path
[docs]
def get_machine_info(mdata, task_type):
if task_type in ["vasp", "abacus"]:
vasp_exec = mdata["fp_command"]
group_size = mdata["fp_group_size"]
resources = mdata["fp_resources"]
machine = mdata["fp_machine"]
command = vasp_exec
command = cmd_append_log(command, "log")
elif task_type in lammps_task_type:
model_devi_exec = mdata["model_devi_command"]
group_size = mdata["model_devi_group_size"]
resources = mdata["model_devi_resources"]
machine = mdata["model_devi_machine"]
command = model_devi_exec + " -i in.lammps"
command = cmd_append_log(command, "model_devi.log")
return machine, resources, command, group_size
[docs]
def collect_task(all_task, task_type):
if task_type == "vasp":
output_file = "OUTCAR"
check_finished = vasp.check_finished
elif task_type in lammps_task_type:
output_file = "log.lammps"
check_finished = lammps.check_finished
elif task_type == "abacus":
output_file = "OUT.ABACUS/running_relax.log"
check_finished = abacus.check_finished
run_tasks_ = []
for ii in all_task:
fres = os.path.join(ii, output_file)
if os.path.isfile(fres):
if not check_finished(fres):
run_tasks_.append(ii)
else:
run_tasks_.append(ii)
run_tasks = [os.path.basename(ii) for ii in run_tasks_]
return run_tasks