#!/usr/bin/env python3
import argparse
import glob
import json
import os
from collections import defaultdict
import numpy as np
from dpdispatcher import Machine, Resources, Submission, Task
from dpti.lib.lammps import get_natoms, get_thermo
from dpti.lib.utils import (
block_avg,
create_path,
get_first_matched_key_from_dict,
get_task_file_abspath,
integrate_range,
parse_seq,
relative_link_file,
)
def _gen_lammps_input(
conf_file,
mass_map,
mass_scale,
model,
template_ff,
nbeads,
nsteps,
timestep,
ens,
temp,
pres=1.0,
tau_t=0.1,
tau_p=0.5,
thermo_freq=100,
dump_freq=100,
copies=None,
):
if nbeads is not None:
if nbeads <= 0:
raise ValueError(
"The number of beads should be positive. Check your nbeads setting."
)
power = 1
while power < nbeads:
power *= 10
ret = ""
ret += "clear\n"
ret += "# --------------------- VARIABLES-------------------------\n"
ret += "variable ibead uloop %d pad\n" % (power - 1)
ret += "variable NSTEPS equal %d\n" % nsteps
ret += "variable THERMO_FREQ equal %d\n" % thermo_freq
ret += "variable DUMP_FREQ equal %d\n" % dump_freq
ret += f"variable TEMP equal {temp:f}\n"
ret += f"variable PRES equal {pres:f}\n"
ret += f"variable TAU_T equal {tau_t:f}\n"
ret += f"variable TAU_P equal {tau_p:f}\n"
ret += "# ---------------------- INITIALIZAITION ------------------\n"
ret += "units metal\n"
ret += "boundary p p p\n"
ret += "atom_style atomic\n"
ret += "atom_modify map yes\n"
ret += "# --------------------- ATOM DEFINITION ------------------\n"
ret += "box tilt large\n"
ret += f'if "${{restart}} > 0" then "read_restart ${{ibead}}.restart.*" else "read_data {conf_file}"\n'
if copies is not None:
ret += "replicate %d %d %d\n" % (copies[0], copies[1], copies[2])
for jj in range(len(mass_map)):
ret += "mass %d %f\n" % (jj + 1, mass_map[jj] * mass_scale)
ret += "# --------------------- FORCE FIELDS ---------------------\n"
if model is not None:
ret += f"pair_style deepmd {model}\n"
ret += "pair_coeff * *\n"
elif template_ff is not None:
ret += template_ff
ret += "# --------------------- MD SETTINGS ----------------------\n"
ret += "neighbor 2.0 bin\n"
ret += "neigh_modify every 10 delay 0 check no\n"
ret += f"timestep {timestep}\n"
ret += "thermo ${THERMO_FREQ}\n"
ret += "compute allmsd all msd\n"
ret += "dump 1 all custom ${DUMP_FREQ} ${ibead}.dump id type x y z\n"
if ens == "nvt":
ret += "fix 1 all pimd/langevin ensemble nvt integrator obabo temp ${TEMP} thermostat PILE_L 1234 tau ${TAU_T}\n"
elif ens == "npt-iso" or ens == "npt":
ret += "fix 1 all pimd/langevin ensemble npt integrator obabo temp ${TEMP} thermostat PILE_L 1234 tau ${TAU_T} iso ${PRES} barostat BZP taup ${TAU_P}\n"
elif ens == "npt-aniso":
ret += "fix 1 all pimd/langevin ensemble npt integrator obabo temp ${TEMP} thermostat PILE_L 1234 tau ${TAU_T} aniso ${PRES} barostat BZP taup ${TAU_P}\n"
elif ens == "npt-tri":
raise RuntimeError("npt-tri is not supported yet")
elif ens == "npt-xy":
raise RuntimeError("npt-xy is not supported yet")
elif ens == "nve":
ret += "fix 1 all pimd/langevin ensemble nve integrator obabo temp ${TEMP}\n"
else:
raise RuntimeError(f"unknow ensemble {ens}\n")
if ens == "nvt" or ens == "nve":
ret += "thermo_style custom step temp f_1[5] f_1[7] c_allmsd[*]\n"
ret += "thermo_modify format float %6.8e\n"
ret += 'fix print all print ${THERMO_FREQ} "$(step) $(temp) $(f_1[5]) $(f_1[7])" append ${ibead}.out title "# step temp K_prim K_cv" screen no'
elif "npt" in ens:
ret += "thermo_style custom step temp vol density f_1[5] f_1[7] f_1[8] f_1[10] c_allmsd[*]\n"
ret += "thermo_modify format float %6.8e\n"
ret += 'fix print all print ${THERMO_FREQ} "$(step) $(temp) $(vol) $(density) $(f_1[5]) $(f_1[7])" append ${ibead}.out title "# step temp vol density K_prim K_cv" screen no'
else:
raise RuntimeError(f"unknow ensemble {ens}\n")
ret += "# --------------------- INITIALIZE -----------------------\n"
ret += "# --------------------- RUN ------------------------------\n"
ret += "restart 100000 ${ibead}.restart\n"
ret += "run ${NSTEPS} upto\n"
ret += "write_data ${ibead}.out.lmp\n"
return ret
[docs]
def make_tasks(iter_name, jdata):
ti_settings = jdata.copy()
equi_conf = jdata["equi_conf"]
equi_conf = os.path.abspath(equi_conf)
copies = None
if "copies" in jdata:
copies = jdata["copies"]
model = jdata.get("model", None)
template_ff_file = jdata.get("template_ff", None)
template_ff = None
if template_ff_file is not None:
with open(template_ff_file) as f:
template_ff = f.read()
if model is not None and template_ff is not None:
raise RuntimeError(
"You are providing both a dp model and a template forcefield. You can only set one of model and template_ff."
)
if model is None and template_ff is None:
raise RuntimeError(
"You must provide a dp model or a template forcefield. Please set either model or template_ff."
)
mass_map = get_first_matched_key_from_dict(jdata, ["model_mass_map", "mass_map"])
nsteps = jdata["nsteps"]
timestep = get_first_matched_key_from_dict(jdata, ["timestep", "dt"])
thermo_freq = get_first_matched_key_from_dict(jdata, ["thermo_freq", "stat_freq"])
dump_freq = get_first_matched_key_from_dict(
jdata, ["dump_freq", "thermo_freq", "stat_freq"]
)
ens = jdata["ens"]
path = jdata["path"]
if "npt" in ens:
if path == "t":
temp_seq = get_first_matched_key_from_dict(jdata, ["temp_seq", "temps"])
temp_list = parse_seq(temp_seq)
pres = get_first_matched_key_from_dict(jdata, ["pres", "press"])
ntasks = len(temp_list)
elif path == "p":
temp = get_first_matched_key_from_dict(jdata, ["temp", "temps"])
pres_seq = get_first_matched_key_from_dict(jdata, ["pres_seq", "press"])
pres_list = parse_seq(pres_seq)
ntasks = len(pres_list)
else:
raise RuntimeError("supported path of npt ens are 't' or 'p'")
tau_t = jdata["tau_t"]
tau_p = jdata["tau_p"]
else:
raise RuntimeError("invalid ens")
job_type = jdata["job_type"]
assert (
job_type == "nbead_convergence" or job_type == "mass_ti"
), "Unknow job_type. Only nbead_convergence and mass_ti are supported."
mass_scale_y_seq = get_first_matched_key_from_dict(jdata, ["mass_scale_y"])
mass_scale_y_list = parse_seq(mass_scale_y_seq)
mass_scales = (1.0 / np.array(mass_scale_y_list)) ** 2
nbead_seq = get_first_matched_key_from_dict(jdata, ["nbead"])
nbead_list = parse_seq(nbead_seq)
nnode_seq = jdata.get("nnode", None)
if nnode_seq is not None:
nnode_list = parse_seq(nnode_seq)
assert (
len(nbead_list) == len(nnode_list)
), "Lists nbead and nnode should have same length. Please specify one nnode for each nbead."
if job_type == "mass_ti":
assert (
len(mass_scale_y_list) == len(nbead_list)
), "For mass TI tasks, you must provide one value of nbead for each value of mass_scale_y."
job_abs_dir = create_path(iter_name)
ti_settings["equi_conf"] = relative_link_file(equi_conf, job_abs_dir)
if model is not None:
ti_settings["model"] = relative_link_file(model, job_abs_dir)
if template_ff is not None:
ti_settings["template_ff"] = relative_link_file(template_ff_file, job_abs_dir)
with open(os.path.join(job_abs_dir, "mti_settings.json"), "w") as f:
json.dump(ti_settings, f, indent=4)
for ii in range(ntasks):
task_dir = os.path.join(job_abs_dir, "task.%06d" % ii)
task_abs_dir = create_path(task_dir)
settings = {}
if path == "t":
temp = temp_list[ii]
elif path == "p":
pres = pres_list[ii]
else:
raise RuntimeError("unsupported path")
settings["temp"] = temp
settings["pres"] = pres
if job_type == "nbead_convergence":
for jj in range(len(mass_scale_y_list)):
mass_scale_y_dir = os.path.join(task_abs_dir, "mass_scale_y.%06d" % jj)
mass_scale_y_abs_dir = create_path(mass_scale_y_dir)
settings["mass_scale_y"] = mass_scale_y_list[jj]
settings["mass_scale"] = mass_scales[jj]
for kk in range(len(nbead_list)):
nbead_dir = os.path.join(mass_scale_y_abs_dir, "nbead.%06d" % kk)
nbead_abs_dir = create_path(nbead_dir)
settings["nbead"] = nbead_list[kk]
if nnode_seq is not None:
settings["nnode"] = nnode_list[kk]
relative_link_file(equi_conf, nbead_abs_dir)
task_model = model
if model is not None:
relative_link_file(model, nbead_abs_dir)
task_model = os.path.basename(model)
lmp_str = _gen_lammps_input(
os.path.basename(equi_conf),
mass_map,
mass_scales[jj],
task_model,
None,
nbead_list[kk],
nsteps,
timestep,
ens,
temp=temp,
pres=pres,
tau_t=tau_t,
thermo_freq=thermo_freq,
dump_freq=dump_freq,
copies=copies,
)
elif template_ff is not None:
lmp_str = _gen_lammps_input(
os.path.basename(equi_conf),
mass_map,
mass_scales[jj],
None,
template_ff,
nbead_list[kk],
nsteps,
timestep,
ens,
temp=temp,
pres=pres,
tau_t=tau_t,
thermo_freq=thermo_freq,
dump_freq=dump_freq,
copies=copies,
)
with open(os.path.join(nbead_abs_dir, "in.lammps"), "w") as f:
f.write(lmp_str)
with open(os.path.join(nbead_abs_dir, "settings.json"), "w") as f:
json.dump(settings, f, indent=4)
elif job_type == "mass_ti":
for jj in range(len(mass_scale_y_list)):
mass_scale_y_dir = os.path.join(task_abs_dir, "mass_scale_y.%06d" % jj)
mass_scale_y_abs_dir = create_path(mass_scale_y_dir)
settings["mass_scale_y"] = mass_scale_y_list[jj]
settings["mass_scale"] = mass_scales[jj]
settings["nbead"] = nbead_list[jj]
if nnode_seq is not None:
settings["nnode"] = nnode_list[jj]
relative_link_file(equi_conf, mass_scale_y_abs_dir)
task_model = model
if model:
relative_link_file(model, mass_scale_y_abs_dir)
task_model = os.path.basename(model)
lmp_str = _gen_lammps_input(
os.path.basename(equi_conf),
mass_map,
mass_scales[jj],
task_model,
None,
nbead_list[jj],
nsteps,
timestep,
ens,
temp=temp,
pres=pres,
tau_t=tau_t,
thermo_freq=thermo_freq,
dump_freq=dump_freq,
copies=copies,
)
elif template_ff is not None:
lmp_str = _gen_lammps_input(
os.path.basename(equi_conf),
mass_map,
mass_scales[jj],
None,
template_ff,
nbead_list[jj],
nsteps,
timestep,
ens,
temp=temp,
pres=pres,
tau_t=tau_t,
thermo_freq=thermo_freq,
dump_freq=dump_freq,
copies=copies,
)
with open(os.path.join(mass_scale_y_abs_dir, "in.lammps"), "w") as f:
f.write(lmp_str)
with open(
os.path.join(mass_scale_y_abs_dir, "settings.json"), "w"
) as f:
json.dump(settings, f, indent=4)
[docs]
def run_task(task_name, jdata, machine_file):
job_type = jdata["job_type"]
nprocs_per_bead = jdata.get("nprocs_per_bead", 1)
if job_type == "nbead_convergence":
task_dir_list = glob.glob(
os.path.join(task_name, "task.*/mass_scale_y.*/nbead.*")
)
link_model = "ln -s ../../../graph.pb"
elif job_type == "mass_ti":
task_dir_list = glob.glob(os.path.join(task_name, "task.*/mass_scale_y.*"))
link_model = "ln -s ../../graph.pb"
else:
raise RuntimeError(
"Unknow job_type. Only nbead_convergence and mass_ti are supported."
)
task_dir_list = sorted(task_dir_list)
work_base_dir = os.getcwd()
with open(machine_file) as f:
mdata = json.load(f)
task_exec = mdata["command"]
number_node = mdata.get("resources", {}).get("number_node", 1)
machine = Machine.load_from_dict(mdata["machine"])
for ii in task_dir_list:
setting = json.load(open(os.path.join(ii, "settings.json")))
nbead = int(setting["nbead"])
nnode = setting.get("nnode", None)
if nnode is not None:
mdata["resources"]["number_node"] = int(nnode)
number_node = nnode
mdata["resources"]["cpu_per_node"] = int(
np.ceil(nbead * nprocs_per_bead / number_node)
)
resources = Resources.load_from_dict(mdata["resources"])
submission = Submission(
work_base=work_base_dir,
resources=resources,
machine=machine,
)
task = Task(
command=f"{link_model}; if ls *.restart.100000 1> /dev/null 2>&1; then {task_exec} -in in.lammps -p {nbead}x{nprocs_per_bead} -log log -v restart 1; else {task_exec} -in in.lammps -p {nbead}x{nprocs_per_bead} -log log -v restart 0; fi",
task_work_path=ii,
forward_files=["in.lammps", "*.lmp", "graph.pb"],
backward_files=["log*", "*out.lmp", "*.dump"],
)
submission.forward_common_files = []
submission.register_task_list(task_list=[task])
submission.run_submission(exit_on_submit=True)
[docs]
def post_tasks(iter_name, jdata, natoms_mol=None):
equi_conf = get_task_file_abspath(iter_name, jdata["equi_conf"])
natoms = get_natoms(equi_conf)
if "copies" in jdata:
natoms *= np.prod(jdata["copies"])
if natoms_mol is not None:
natoms /= natoms_mol
job_type = jdata["job_type"]
stat_skip = jdata["stat_skip"]
stat_bsize = jdata["stat_bsize"]
if job_type == "nbead_convergence":
counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
task_dir_list = glob.glob(
os.path.join(iter_name, "task.*/mass_scale_y.*/nbead.*")
)
task_dir_list = sorted(task_dir_list)
for ii in task_dir_list:
parts = ii.split(os.sep)
task, mass_scale_y, nbead = None, None, None
for part in parts:
if part.startswith("task."):
task = part
elif part.startswith("mass_scale_y."):
mass_scale_y = part
elif part.startswith("nbead."):
nbead = part
counts[task][mass_scale_y][nbead] += 1
elif job_type == "mass_ti":
counts = defaultdict(lambda: defaultdict(int))
task_dir_list = glob.glob(os.path.join(iter_name, "task.*/mass_scale_y.*"))
task_dir_list = sorted(task_dir_list)
for ii in task_dir_list:
parts = ii.split(os.sep)
task, mass_scale_y = None, None
for part in parts:
if part.startswith("task."):
task = part
elif part.startswith("mass_scale_y."):
mass_scale_y = part
counts[task][mass_scale_y] += 1
else:
raise RuntimeError(
"Unknow job_type. Only nbead_convergence and mass_ti are supported."
)
ens = jdata["ens"]
if ens == "nvt" or ens == "nve":
stat_col = 3
elif "npt" in ens:
stat_col = 5
else:
raise RuntimeError("unsupported ens")
if job_type == "nbead_convergence":
for task in counts.keys():
for mass_scale_y in counts[task].keys():
result = []
for nbead in counts[task][mass_scale_y].keys():
if counts[task][mass_scale_y][nbead] == 0:
continue
task_dir = os.path.join(iter_name, task, mass_scale_y, nbead)
settings = json.load(open(os.path.join(task_dir, "settings.json")))
out_files = glob.glob(os.path.join(task_dir, "*1.out"))
if len(out_files) == 0:
log_name = os.path.join(task_dir, "log.0")
data = get_thermo(log_name)
np.savetxt(os.path.join(task_dir, "data"), data, fmt="%.6e")
else:
out_files = sorted(out_files)
data = np.loadtxt(out_files[0])
num_nbead = settings["nbead"]
kcv, kcverr = block_avg(
data[:, stat_col], skip=stat_skip, block_size=stat_bsize
)
result.append([num_nbead, kcv, kcverr])
result = np.array(result)
np.savetxt(
os.path.join(iter_name, task, mass_scale_y, "kcv.out"),
result,
fmt=["%12d", "%22.6e", "%22.6e"],
header=f"{'nbead':>12} {'kcv':>22} {'kcv_err':>22}",
)
elif job_type == "mass_ti":
for task in counts.keys():
result = []
for mass_scale_y in counts[task].keys():
if counts[task][mass_scale_y] == 0:
continue
task_dir = os.path.join(iter_name, task, mass_scale_y)
settings = json.load(open(os.path.join(task_dir, "settings.json")))
out_files = glob.glob(os.path.join(task_dir, "*1.out"))
if len(out_files) == 0:
log_name = os.path.join(task_dir, "log.0")
data = get_thermo(log_name)
np.savetxt(os.path.join(task_dir, "data"), data, fmt="%.6e")
else:
out_files = sorted(out_files)
data = np.loadtxt(out_files[0])
mass_scale_y_value = settings["mass_scale_y"]
kcv, kcverr = block_avg(
data[:, stat_col], skip=stat_skip, block_size=stat_bsize
)
result.append([mass_scale_y_value, kcv, kcverr])
result = np.array(result)
np.savetxt(
os.path.join(iter_name, task, "kcv.out"),
result,
fmt=["%22.6e", "%22.6e", "%22.6e"],
header="# mass_scale_y kcv kcv_err",
)
mass_scale_y_values, kcv_inte, kcv_inte_err, kcv_stat_err = integrate_range(
result[:, 0], result[:, 1], result[:, 2]
)
np.savetxt(
os.path.join(iter_name, task, "kcv_inte.out"),
np.array([kcv_inte, kcv_inte_err, kcv_stat_err]),
fmt=["%22.6e", "%22.6e", "%22.6e"],
header="# kcv_inte kcv_inte_err kcv_stat_err",
)
else:
raise RuntimeError(
"Unknow job_type. Only nbead_convergence and mass_ti are supported."
)
def _main():
parser = argparse.ArgumentParser(
description="Compute free energy of ice by Hamiltonian TI"
)
main_subparsers = parser.add_subparsers(
title="modules",
description="the subcommands of dpti",
help="module-level help",
dest="module",
required=True,
)
add_subparsers(main_subparsers)
args = parser.parse_args()
exec_args(args, parser)
[docs]
def exec_args(args, parser):
if hasattr(args, "func"):
args.func(args)
else:
parser.print_help()
[docs]
def add_module_subparsers(main_subparsers):
module_parser = main_subparsers.add_parser(
"mti",
help="mass thermodynamic integration: quantum free energy calculation using PIMD",
)
module_subparsers = module_parser.add_subparsers(
help="commands of mass thermodynamic integration",
dest="command",
required=True,
)
add_subparsers(module_subparsers)
[docs]
def add_subparsers(module_subparsers):
parser_gen = module_subparsers.add_parser("gen", help="Generate a job")
parser_gen.add_argument("PARAM", type=str, help="json parameter file")
parser_gen.add_argument(
"-o",
"--output",
type=str,
default="new_job",
help="the output folder for the job",
)
parser_gen.set_defaults(func=handle_gen)
parser_run = module_subparsers.add_parser("run", help="run the job")
parser_run.add_argument("JOB", type=str, help="folder of the job")
parser_run.add_argument("PARAM", type=str, help="json parameter file")
parser_run.add_argument("machine", type=str, help="machine.json file for the job")
parser_run.set_defaults(func=handle_run)
parser_compute = module_subparsers.add_parser(
"compute", help="Compute the result of a job"
)
parser_compute.add_argument("JOB", type=str, help="folder of the job")
parser_compute.add_argument(
"--natom_mol", type=int, help="the number of atoms in the molecule"
)
parser_compute.set_defaults(func=handle_compute)
[docs]
def handle_gen(args):
with open(args.PARAM) as j:
jdata = json.load(j)
make_tasks(args.output, jdata)
[docs]
def handle_run(args):
with open(args.PARAM) as j:
jdata = json.load(j)
run_task(args.JOB, jdata, args.machine)
[docs]
def handle_compute(args):
with open(os.path.join(args.JOB, "mti_settings.json")) as j:
jdata = json.load(j)
post_tasks(args.JOB, jdata, args.natom_mol)
if __name__ == "__main__":
_main()