Source code for dpgen.tools.relabel

#!/usr/bin/env python3

import argparse
import glob
import json
import os
import shutil
import sys

sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
import dpdata

from dpgen.generator.lib.pwscf import make_pwscf_input
from dpgen.generator.lib.siesta import make_siesta_input
from dpgen.generator.run import update_mass_map


[docs] def get_lmp_info(input_file): with open(input_file) as fp: lines = [line.rstrip("\n") for line in fp] for ii in lines: words = ii.split() if len(words) >= 4 and words[0] == "variable": if words[1] == "TEMP": temp = float(words[3]) elif words[1] == "PRES": pres = float(words[3]) if len(words) >= 4 and words[0] == "fix": if words[3] == "nvt": ens = "nvt" elif words[3] == "npt": ens = "npt" return ens, temp, pres
[docs] def copy_pp_files(tdir, fp_pp_path, fp_pp_files): cwd = os.getcwd() os.chdir(tdir) for ii in fp_pp_files: if os.path.lexists(ii): os.remove(ii) if os.path.exists(ii): os.remove(ii) shutil.copyfile(os.path.join(fp_pp_path, ii), ii) os.chdir(cwd)
[docs] def make_vasp(tdir, fp_params): cwd = os.getcwd() os.chdir(tdir) incar = make_vasp_incar(fp_params) with open("INCAR", "w") as fp: fp.write(incar) os.chdir(cwd)
[docs] def make_vasp_incar(tdir, fp_incar): cwd = os.getcwd() os.chdir(tdir) shutil.copyfile(fp_incar, "INCAR") os.chdir(cwd)
[docs] def make_pwscf(tdir, fp_params, mass_map, fp_pp_path, fp_pp_files, user_input): cwd = os.getcwd() os.chdir(tdir) sys_data = dpdata.System("POSCAR").data sys_data["atom_masses"] = mass_map ret = make_pwscf_input(sys_data, fp_pp_files, fp_params) open("input", "w").write(ret) os.chdir(cwd)
[docs] def make_siesta(tdir, fp_params, fp_pp_path, fp_pp_files): cwd = os.getcwd() os.chdir(tdir) sys_data = dpdata.System("POSCAR").data ret = make_siesta_input(sys_data, fp_pp_files, fp_params) open("input", "w").write(ret) os.chdir(cwd)
[docs] def create_init_tasks(target_folder, param_file, output, fp_json, verbose=True): target_folder = os.path.abspath(target_folder) output = os.path.abspath(output) tool_path = os.path.join( os.path.dirname(os.path.realpath(__file__)), "..", "template" ) jdata = json.load(open(os.path.join(target_folder, param_file))) update_mass_map(jdata) fp_jdata = json.load(open(fp_json)) # fp settings mass_map = jdata["mass_map"] type_map = jdata["type_map"] fp_style = fp_jdata["fp_style"] fp_pp_path = fp_jdata["fp_pp_path"] fp_pp_files = fp_jdata["fp_pp_files"] cwd_ = os.getcwd() os.chdir(target_folder) fp_pp_path = os.path.abspath(fp_pp_path) os.chdir(cwd_) # init data sys init_data_prefix = jdata["init_data_prefix"] init_data_sys = jdata["init_data_sys"] for idx, ii in enumerate(init_data_sys): sys = dpdata.LabeledSystem( os.path.join(init_data_prefix, ii), fmt="deepmd/npy", type_map=type_map ) nframes = sys.get_nframes() sys_dir = os.path.join(output, "init_system.%03d" % idx) os.makedirs(sys_dir, exist_ok=True) if verbose: print("# working on " + sys_dir) with open(os.path.join(sys_dir, "record"), "w") as fp: fp.write(os.path.join(init_data_prefix, ii) + "\n") for ff in range(nframes): task_dir = os.path.join(sys_dir, "task.%06d" % ff) os.makedirs(task_dir, exist_ok=True) sys.to_vasp_poscar(os.path.join(task_dir, "POSCAR"), frame_idx=ff) # make fp cwd_ = os.getcwd() os.chdir(task_dir) for pp in fp_pp_files: if os.path.lexists(pp): os.remove(pp) os.symlink(os.path.relpath(os.path.join(output, pp)), pp) if fp_style == "vasp": if os.path.lexists("INCAR"): os.remove("INCAR") os.symlink(os.path.relpath(os.path.join(output, "INCAR")), "INCAR") elif fp_style == "pwscf": try: fp_params = fp_jdata["user_fp_params"] user_input = True except Exception: fp_params = fp_jdata["fp_params"] user_input = False make_pwscf( ".", fp_params, mass_map, fp_pp_files, fp_pp_files, user_input ) elif fp_style == "siesta": make_siesta(".", fp_params, fp_pp_files, fp_pp_files) os.chdir(cwd_)
[docs] def create_tasks( target_folder, param_file, output, fp_json, verbose=True, numb_iter=-1 ): target_folder = os.path.abspath(target_folder) output = os.path.abspath(output) tool_path = os.path.join( os.path.dirname(os.path.realpath(__file__)), "..", "template" ) jdata = json.load(open(os.path.join(target_folder, param_file))) update_mass_map(jdata) fp_jdata = json.load(open(fp_json)) # goto input cwd = os.getcwd() os.chdir(target_folder) sys = jdata["sys_configs"] # fp settings mass_map = jdata["mass_map"] fp_style = fp_jdata["fp_style"] fp_pp_path = fp_jdata["fp_pp_path"] fp_pp_files = fp_jdata["fp_pp_files"] cwd_ = os.getcwd() os.chdir(target_folder) fp_pp_path = os.path.abspath(fp_pp_path) os.chdir(cwd_) fp_params = fp_jdata["fp_params"] # collect tasks from iter dirs sys_tasks = [[] for ii in sys] sys_tasks_record = [[] for ii in sys] sys_tasks_cc = [0 for ii in sys] numb_sys = len(sys) iters = glob.glob("iter.[0-9]*[0-9]") iters.sort() # iters = iters[:2] for ii in iters[:numb_iter]: iter_tasks = glob.glob( os.path.join(ii, "02.fp", "task.[0-9]*[0-9].[0-9]*[0-9]") ) iter_tasks.sort() if verbose: print("# check iter " + ii + " with %6d tasks" % len(iter_tasks)) for jj in iter_tasks: sys_idx = int(os.path.basename(jj).split(".")[-2]) sys_tasks[sys_idx].append(jj) if os.path.islink(os.path.join(jj, "conf.lmp")): linked_file = os.path.realpath(os.path.join(jj, "conf.lmp")) elif os.path.islink(os.path.join(jj, "conf.dump")): linked_file = os.path.realpath(os.path.join(jj, "conf.dump")) else: raise RuntimeError("cannot file linked conf file") linked_keys = linked_file.split("/") task_record = ( linked_keys[-5] + "." + linked_keys[-3] + "." + linked_keys[-1].split(".")[0] ) task_record_keys = task_record.split(".") ens, temp, pres = get_lmp_info( os.path.join(ii, "01.model_devi", linked_keys[-3], "input.lammps") ) human_record = ( "iter: %s system: %s model_devi_task: %s frame: %6d fp_task: %s ens: %s temp: %10.2f pres: %10.2f" % ( task_record_keys[1], task_record_keys[3], linked_keys[-3], int(task_record_keys[-1]), os.path.basename(jj), ens, temp, pres, ) ) # print(human_record) sys_tasks_record[sys_idx].append(human_record) # for ii in range(numb_sys) : # for jj in range(len(sys_tasks[ii])) : # print(sys_tasks[ii][jj], sys_tasks_record[ii][jj]) # mk output os.makedirs(output, exist_ok=True) if fp_style == "vasp": copy_pp_files(output, fp_pp_path, fp_pp_files) make_vasp_incar(fp_params, output) if fp_style == "pwscf": copy_pp_files(output, fp_pp_path, fp_pp_files) if fp_style == "siesta": copy_pp_files(output, fp_pp_path, fp_pp_files) for si in range(numb_sys): sys_dir = os.path.join(output, "system.%03d" % si) if verbose: print("# working on " + sys_dir) for tt, rr in zip(sys_tasks[si], sys_tasks_record[si]): # copy poscar source_path = os.path.join(("iter.%s/02.fp" % rr.split()[1]), rr.split()[9]) source_file = os.path.join(source_path, "POSCAR") target_path = os.path.join(sys_dir, "task.%06d" % sys_tasks_cc[si]) sys_tasks_cc[si] += 1 os.makedirs(target_path, exist_ok=True) target_file = os.path.join(target_path, "POSCAR") target_recd = os.path.join(target_path, "record") if os.path.exists(target_file): os.remove(target_file) if os.path.exists(target_recd): os.remove(target_recd) shutil.copyfile(source_file, target_file) with open(target_recd, "w") as fp: fp.write("\n".join([target_folder, rr, ""])) # make fp cwd_ = os.getcwd() os.chdir(target_path) for pp in fp_pp_files: if os.path.lexists(pp): os.remove(pp) os.symlink(os.path.relpath(os.path.join(output, pp)), pp) if fp_style == "vasp": if os.path.lexists("INCAR"): os.remove("INCAR") os.symlink(os.path.relpath(os.path.join(output, "INCAR")), "INCAR") elif fp_style == "pwscf": try: fp_params = fp_jdata["user_fp_params"] user_input = True except Exception: fp_params = fp_jdata["fp_params"] user_input = False make_pwscf( ".", fp_params, mass_map, fp_pp_files, fp_pp_files, user_input ) elif fp_style == "siesta": make_siesta(".", fp_params, mass_map, fp_pp_files, fp_pp_files) os.chdir(cwd_) os.chdir(cwd)
def _main(): parser = argparse.ArgumentParser( description="Create tasks for relabeling from a DP-GEN job" ) parser.add_argument("JOB_DIR", type=str, help="the directory of the DP-GEN job") parser.add_argument( "PARAM", type=str, default="fp.json", help="the json file defines vasp tasks" ) parser.add_argument( "OUTPUT", type=str, help="the output directory of relabel tasks" ) parser.add_argument( "-p", "--parameter", type=str, default="param.json", help="the json file provides DP-GEN paramters, should be located in JOB_DIR", ) parser.add_argument( "-n", "--numb-iter", type=int, default=-1, help="number of iterations to relabel", ) parser.add_argument("-v", "--verbose", action="store_true", help="being loud") args = parser.parse_args() create_tasks( args.JOB_DIR, args.parameter, args.OUTPUT, args.PARAM, numb_iter=args.numb_iter, verbose=args.verbose, ) create_init_tasks( args.JOB_DIR, args.parameter, args.OUTPUT, args.PARAM, verbose=args.verbose ) if __name__ == "__main__": _main()