Source code for dpgen2.entrypoint.submit

import copy
import glob
import json
import logging
import os
import pickle
import re
from pathlib import (
    Path,
)
from typing import (
    Dict,
    List,
    Optional,
    Tuple,
    Type,
    Union,
)

import dpdata
from dflow import (
    ArgoStep,
    InputArtifact,
    InputParameter,
    Inputs,
    OutputArtifact,
    OutputParameter,
    Outputs,
    S3Artifact,
    Step,
    Steps,
    Workflow,
    argo_range,
    download_artifact,
    upload_artifact,
)
from dflow.python import (
    OP,
    OPIO,
    Artifact,
    FatalError,
    OPIOSign,
    PythonOPTemplate,
    TransientError,
    upload_packages,
)

from dpgen2.conf import (
    conf_styles,
)
from dpgen2.constants import (
    default_host,
    default_image,
)
from dpgen2.entrypoint.args import normalize as normalize_args
from dpgen2.entrypoint.common import (
    expand_idx,
    expand_sys_str,
    global_config_workflow,
)
from dpgen2.exploration.render import (
    TrajRenderLammps,
)
from dpgen2.exploration.report import (
    ExplorationReportTrustLevelsRandom,
    conv_styles,
)
from dpgen2.exploration.scheduler import (
    ConvergenceCheckStageScheduler,
    ExplorationScheduler,
)
from dpgen2.exploration.selector import (
    ConfSelectorFrames,
)
from dpgen2.exploration.task import (
    CustomizedLmpTemplateTaskGroup,
    ExplorationStage,
    ExplorationTask,
    LmpTemplateTaskGroup,
    NPTTaskGroup,
    caly_normalize,
    make_calypso_task_group_from_config,
    make_lmp_task_group_from_config,
    normalize_lmp_task_group_config,
)
from dpgen2.flow import (
    ConcurrentLearning,
)
from dpgen2.fp import (
    fp_styles,
)
from dpgen2.op import (
    CollectData,
    CollRunCaly,
    PrepCalyInput,
    PrepDPTrain,
    PrepLmp,
    PrepRunDPOptim,
    RunCalyModelDevi,
    RunDPTrain,
    RunLmp,
    SelectConfs,
)
from dpgen2.superop import (
    CalyEvoStep,
    ConcurrentLearningBlock,
    PrepRunCaly,
    PrepRunDPTrain,
    PrepRunFp,
    PrepRunLmp,
)
from dpgen2.utils import (
    BinaryFileInput,
    bohrium_config_from_dict,
    dump_object_to_file,
    get_subkey,
    load_object_from_file,
    matched_step_key,
    print_keys_in_nice_format,
    sort_slice_ops,
    workflow_config_from_dict,
)
from dpgen2.utils.step_config import normalize as normalize_step_dict

default_config = normalize_step_dict(
    {
        "template_config": {
            "image": default_image,
        }
    }
)


[docs] def make_concurrent_learning_op( train_style: str = "dp", explore_style: str = "lmp", fp_style: str = "vasp", prep_train_config: dict = default_config, run_train_config: dict = default_config, prep_explore_config: dict = default_config, run_explore_config: dict = default_config, prep_fp_config: dict = default_config, run_fp_config: dict = default_config, select_confs_config: dict = default_config, collect_data_config: dict = default_config, cl_step_config: dict = default_config, upload_python_packages: Optional[List[os.PathLike]] = None, ): if train_style in ("dp", "dp-dist"): prep_run_train_op = PrepRunDPTrain( "prep-run-dp-train", PrepDPTrain, RunDPTrain, prep_config=prep_train_config, run_config=run_train_config, upload_python_packages=upload_python_packages, ) else: raise RuntimeError(f"unknown train_style {train_style}") if explore_style == "lmp": prep_run_explore_op = PrepRunLmp( "prep-run-lmp", PrepLmp, RunLmp, prep_config=prep_explore_config, run_config=run_explore_config, upload_python_packages=upload_python_packages, ) elif explore_style == "calypso": caly_evo_step_op = CalyEvoStep( "caly-evo-step", collect_run_caly=CollRunCaly, prep_run_dp_optim=PrepRunDPOptim, prep_config=prep_explore_config, run_config=run_explore_config, upload_python_packages=upload_python_packages, ) prep_run_explore_op = PrepRunCaly( "prep-run-calypso", prep_caly_input_op=PrepCalyInput, caly_evo_step_op=caly_evo_step_op, run_caly_model_devi_op=RunCalyModelDevi, prep_config=prep_explore_config, run_config=run_explore_config, upload_python_packages=upload_python_packages, ) else: raise RuntimeError(f"unknown explore_style {explore_style}") if fp_style in fp_styles.keys(): prep_run_fp_op = PrepRunFp( "prep-run-fp", fp_styles[fp_style]["prep"], fp_styles[fp_style]["run"], prep_config=prep_fp_config, run_config=run_fp_config, upload_python_packages=upload_python_packages, ) else: raise RuntimeError(f"unknown fp_style {fp_style}") # ConcurrentLearningBlock block_cl_op = ConcurrentLearningBlock( "concurrent-learning-block", prep_run_train_op, prep_run_explore_op, SelectConfs, prep_run_fp_op, CollectData, select_confs_config=select_confs_config, collect_data_config=collect_data_config, upload_python_packages=upload_python_packages, ) # dpgen dpgen_op = ConcurrentLearning( "concurrent-learning", block_cl_op, upload_python_packages=upload_python_packages, step_config=cl_step_config, ) return dpgen_op
[docs] def make_naive_exploration_scheduler( config, ): # use npt task group explore_style = config["explore"]["type"] if explore_style == "lmp": return make_lmp_naive_exploration_scheduler(config) elif explore_style == "calypso": return make_calypso_naive_exploration_scheduler(config)
[docs] def make_calypso_naive_exploration_scheduler(config): model_devi_jobs = config["explore"]["stages"] fp_task_max = config["fp"]["task_max"] max_numb_iter = config["explore"]["max_numb_iter"] fatal_at_max = config["explore"]["fatal_at_max"] convergence = config["explore"]["convergence"] output_nopbc = config["explore"]["output_nopbc"] scheduler = ExplorationScheduler() # report conv_style = convergence.pop("type") report = conv_styles[conv_style](**convergence) render = TrajRenderLammps(nopbc=output_nopbc) # selector selector = ConfSelectorFrames( render, report, fp_task_max, ) for job_ in model_devi_jobs: if not isinstance(job_, list): job = [job_] else: job = job_ # stage stage = ExplorationStage() for jj in job: jconf = caly_normalize(jj) # make task group tgroup = make_calypso_task_group_from_config(jconf) # add the list to task group tasks = tgroup.make_task() stage.add_task_group(tasks) # stage_scheduler stage_scheduler = ConvergenceCheckStageScheduler( stage, selector, max_numb_iter=max_numb_iter, fatal_at_max=fatal_at_max, ) # scheduler scheduler.add_stage_scheduler(stage_scheduler) return scheduler
[docs] def make_lmp_naive_exploration_scheduler(config): model_devi_jobs = config["explore"]["stages"] sys_configs = config["explore"]["configurations"] mass_map = config["inputs"]["mass_map"] type_map = config["inputs"]["type_map"] numb_models = config["train"]["numb_models"] fp_task_max = config["fp"]["task_max"] max_numb_iter = config["explore"]["max_numb_iter"] fatal_at_max = config["explore"]["fatal_at_max"] convergence = config["explore"]["convergence"] output_nopbc = config["explore"]["output_nopbc"] scheduler = ExplorationScheduler() # report conv_style = convergence.pop("type") report = conv_styles[conv_style](**convergence) render = TrajRenderLammps(nopbc=output_nopbc) # selector selector = ConfSelectorFrames( render, report, fp_task_max, ) sys_configs_lmp = [] for sys_config in sys_configs: conf_style = sys_config.pop("type") generator = conf_styles[conf_style](**sys_config) sys_configs_lmp.append(generator.get_file_content(type_map)) for job_ in model_devi_jobs: if not isinstance(job_, list): job = [job_] else: job = job_ # stage stage = ExplorationStage() for jj in job: jconf = normalize_lmp_task_group_config(jj) n_sample = jconf.pop("n_sample") ## ignore the expansion of sys_idx # get all file names of md initial configurations sys_idx = jconf.pop("conf_idx") conf_list = [] for ii in sys_idx: conf_list += sys_configs_lmp[ii] # make task group tgroup = make_lmp_task_group_from_config(numb_models, mass_map, jconf) # add the list to task group tgroup.set_conf( conf_list, n_sample=n_sample, random_sample=True, ) tasks = tgroup.make_task() stage.add_task_group(tasks) # stage_scheduler stage_scheduler = ConvergenceCheckStageScheduler( stage, selector, max_numb_iter=max_numb_iter, fatal_at_max=fatal_at_max, ) # scheduler scheduler.add_stage_scheduler(stage_scheduler) return scheduler
[docs] def get_kspacing_kgamma_from_incar( fname, ): with open(fname) as fp: lines = fp.readlines() ks = None kg = None for ii in lines: if "KSPACING" in ii: ks = float(ii.split("=")[1]) if "KGAMMA" in ii: if "T" in ii.split("=")[1]: kg = True elif "F" in ii.split("=")[1]: kg = False else: raise RuntimeError(f"invalid kgamma value {ii.split('=')[1]}") assert ks is not None and kg is not None return ks, kg
[docs] def make_optional_parameter( mixed_type=False, finetune_mode="no", ): return {"data_mixed_type": mixed_type, "finetune_mode": finetune_mode}
[docs] def make_finetune_step( config, prep_train_config, run_train_config, upload_python_packages, numb_models, template_script, train_config, init_models, init_data, iter_data, ): finetune_optional_parameter = { "mixed_type": config["inputs"]["mixed_type"], "finetune_mode": "finetune", } finetune_op = PrepRunDPTrain( "finetune", PrepDPTrain, RunDPTrain, prep_config=prep_train_config, run_config=run_train_config, upload_python_packages=upload_python_packages, finetune=True, ) finetune_step = Step( "finetune-step", template=finetune_op, parameters={ "block_id": "finetune", "numb_models": numb_models, "template_script": template_script, "train_config": train_config, "run_optional_parameter": finetune_optional_parameter, }, artifacts={ "init_models": init_models, "init_data": init_data, "iter_data": iter_data, }, ) return finetune_step
[docs] def workflow_concurrent_learning( config: Dict, ) -> Tuple[Step, Optional[Step]]: default_config = config["default_step_config"] train_style = config["train"]["type"] explore_style = config["explore"]["type"] fp_style = config["fp"]["type"] prep_train_config = config["step_configs"]["prep_train_config"] run_train_config = config["step_configs"]["run_train_config"] prep_explore_config = config["step_configs"]["prep_explore_config"] run_explore_config = config["step_configs"]["run_explore_config"] prep_fp_config = config["step_configs"]["prep_fp_config"] run_fp_config = config["step_configs"]["run_fp_config"] select_confs_config = config["step_configs"]["select_confs_config"] collect_data_config = config["step_configs"]["collect_data_config"] cl_step_config = config["step_configs"]["cl_step_config"] upload_python_packages = config.get("upload_python_packages", None) if train_style == "dp": init_models_paths = config["train"].get("init_models_paths", None) numb_models = config["train"]["numb_models"] if init_models_paths is not None and len(init_models_paths) != numb_models: raise RuntimeError( f"{len(init_models_paths)} init models provided, which does " "not match numb_models={numb_models}" ) elif train_style == "dp-dist": init_models_paths = ( [config["train"]["student_model_path"]] if "student_model_path" in config["train"] else None ) config["train"]["numb_models"] = 1 else: raise RuntimeError(f"unknown params, train_style: {train_style}") if upload_python_packages is not None and isinstance(upload_python_packages, str): upload_python_packages = [upload_python_packages] if upload_python_packages is not None: _upload_python_packages: List[os.PathLike] = [ Path(ii) for ii in upload_python_packages ] upload_python_packages = _upload_python_packages concurrent_learning_op = make_concurrent_learning_op( train_style, explore_style, fp_style, prep_train_config=prep_train_config, run_train_config=run_train_config, prep_explore_config=prep_explore_config, run_explore_config=run_explore_config, prep_fp_config=prep_fp_config, run_fp_config=run_fp_config, select_confs_config=select_confs_config, collect_data_config=collect_data_config, cl_step_config=cl_step_config, upload_python_packages=upload_python_packages, ) scheduler = make_naive_exploration_scheduler(config) type_map = config["inputs"]["type_map"] numb_models = config["train"]["numb_models"] template_script_ = config["train"]["template_script"] if isinstance(template_script_, list): template_script = [json.loads(Path(ii).read_text()) for ii in template_script_] else: template_script = json.loads(Path(template_script_).read_text()) train_config = config["train"]["config"] explore_config = config["explore"]["config"] if ( "teacher_model_path" in explore_config and explore_config["teacher_model_path"] is not None ): assert os.path.exists( explore_config["teacher_model_path"] ), f"No such file: {explore_config['teacher_model_path']}" explore_config["teacher_model_path"] = BinaryFileInput( explore_config["teacher_model_path"], "pb" ) fp_config = {} fp_inputs_config = config["fp"]["inputs_config"] fp_inputs = fp_styles[fp_style]["inputs"](**fp_inputs_config) fp_config["inputs"] = fp_inputs fp_config["run"] = config["fp"]["run_config"] if fp_style == "deepmd": assert ( "teacher_model_path" in fp_config["run"] ), f"Cannot find 'teacher_model_path' in config['fp']['run_config'] when fp_style == 'deepmd'" assert os.path.exists( fp_config["run"]["teacher_model_path"] ), f"No such file: {fp_config['run']['teacher_model_path']}" fp_config["run"]["teacher_model_path"] = BinaryFileInput( fp_config["run"]["teacher_model_path"], "pb" ) init_data_prefix = config["inputs"]["init_data_prefix"] init_data = config["inputs"]["init_data_sys"] if init_data_prefix is not None: init_data = [os.path.join(init_data_prefix, ii) for ii in init_data] if isinstance(init_data, str): init_data = expand_sys_str(init_data) init_data = upload_artifact(init_data) iter_data = upload_artifact([]) if init_models_paths is not None: init_models = upload_artifact(init_models_paths) else: init_models = None finetune_step = None optional_parameter = make_optional_parameter( config["inputs"]["mixed_type"], ) if config["inputs"].get("do_finetune", False): finetune_step = make_finetune_step( config, prep_train_config, run_train_config, upload_python_packages, numb_models, template_script, train_config, init_models, init_data, iter_data, ) init_models = finetune_step.outputs.artifacts["models"] template_script = finetune_step.outputs.parameters["template_script"] optional_parameter = make_optional_parameter( config["inputs"]["mixed_type"], finetune_mode="train-init", ) # here the scheduler is passed as input parameter to the concurrent_learning_op dpgen_step = Step( "dpgen-step", template=concurrent_learning_op, parameters={ "type_map": type_map, "numb_models": numb_models, "template_script": template_script, "train_config": train_config, "explore_config": explore_config, "fp_config": fp_config, "exploration_scheduler": scheduler, "optional_parameter": optional_parameter, }, artifacts={ "init_models": init_models, "init_data": init_data, "iter_data": iter_data, }, ) return dpgen_step, finetune_step
[docs] def get_scheduler_ids( reuse_step, ): scheduler_ids = [] for idx, ii in enumerate(reuse_step): if get_subkey(ii.key, 1) == "scheduler": scheduler_ids.append(idx) scheduler_keys = [reuse_step[ii].key for ii in scheduler_ids] assert ( sorted(scheduler_keys) == scheduler_keys ), "The scheduler keys are not properly sorted" if len(scheduler_ids) == 0: logging.warning( "No scheduler found in the workflow, " "does not do any replacement." ) return scheduler_ids
[docs] def update_reuse_step_scheduler( reuse_step, scheduler_new, ): scheduler_ids = get_scheduler_ids(reuse_step) if len(scheduler_ids) == 0: return reuse_step # do replacement reuse_step[scheduler_ids[-1]].modify_output_parameter( "exploration_scheduler", scheduler_new ) return reuse_step
[docs] def copy_scheduler_plans( scheduler_new, scheduler_old, ): if len(scheduler_old.stage_schedulers) == 0: return scheduler_new if len(scheduler_new.stage_schedulers) < len(scheduler_old.stage_schedulers): raise RuntimeError( "The new scheduler has less stages than the old scheduler, " "scheduler copy is not supported." ) # the scheduler_old is planned. minic the init call of the scheduler if scheduler_old.get_iteration() > -1: scheduler_new.plan_next_iteration() for ii in range(len(scheduler_old.stage_schedulers)): old_stage = scheduler_old.stage_schedulers[ii] old_reports = old_stage.get_reports() if old_stage.next_iteration() > 0: if ii != scheduler_new.get_stage(): raise RuntimeError( f"The stage {scheduler_new.get_stage()} of the new " f"scheduler does not match" f"the stage {ii} of the old scheduler. " f"scheduler, which should not happen" ) for report in old_reports: scheduler_new.plan_next_iteration(report) if old_stage.complete() and ( not scheduler_new.stage_schedulers[ii].complete() ): scheduler_new.force_stage_complete() else: break return scheduler_new
[docs] def submit_concurrent_learning( wf_config, reuse_step: Optional[List[ArgoStep]] = None, replace_scheduler: bool = False, no_submission: bool = False, ): # normalize args wf_config = normalize_args(wf_config) global_config_workflow(wf_config) dpgen_step, finetune_step = workflow_concurrent_learning( wf_config, ) if reuse_step is not None and replace_scheduler: scheduler_new = copy.deepcopy( dpgen_step.inputs.parameters["exploration_scheduler"].value ) idx_old = get_scheduler_ids(reuse_step)[-1] scheduler_old = ( reuse_step[idx_old].inputs.parameters["exploration_scheduler"].value ) scheduler_new = copy_scheduler_plans(scheduler_new, scheduler_old) exploration_report = ( reuse_step[idx_old].inputs.parameters["exploration_report"].value ) # plan next # hack! trajs is set to None... conv, expl_task_grp, selector = scheduler_new.plan_next_iteration( exploration_report, trajs=None ) # update output of the scheduler step reuse_step[idx_old].modify_output_parameter( "converged", conv, ) reuse_step[idx_old].modify_output_parameter( "exploration_scheduler", scheduler_new, ) reuse_step[idx_old].modify_output_parameter( "expl_task_grp", expl_task_grp, ) reuse_step[idx_old].modify_output_parameter( "conf_selector", selector, ) # the modify-train-script step will be added as reuse step. # the following hack is not needed anymore. # wf_config["inputs"]["do_finetune"] = False # finetune will not be done again if the old process is reused. wf = Workflow(name=wf_config["name"]) if wf_config["inputs"].get("do_finetune", False): assert finetune_step is not None wf.add(finetune_step) wf.add(dpgen_step) # for debug purpose, we may not really submit the wf if not no_submission: wf.submit(reuse_step=reuse_step) return wf
[docs] def successful_step_keys(wf): all_step_keys = [] for step in wf.query_step(): if step.key is not None and step.phase == "Succeeded": all_step_keys.append(step.key) return all_step_keys
[docs] def get_superop(key): if "prep-train" in key: return key.replace("prep-train", "prep-run-train") elif "run-train-" in key: return re.sub("run-train-[0-9]*", "prep-run-train", key) elif "prep-lmp" in key: return key.replace("prep-lmp", "prep-run-explore") elif "run-lmp-" in key: return re.sub("run-lmp-[0-9]*", "prep-run-explore", key) elif "prep-fp" in key: return key.replace("prep-fp", "prep-run-fp") elif "run-fp-" in key: return re.sub("run-fp-[0-9]*", "prep-run-fp", key) elif "prep-caly-input" in key: return key.replace("prep-caly-input", "prep-run-explore") elif "collect-run-calypso-" in key: return re.sub("collect-run-calypso-[0-9]*-[0-9]*", "prep-run-explore", key) elif "prep-run-dp-optim-" in key: return re.sub("prep-run-dp-optim-[0-9]*-[0-9]*", "prep-run-explore", key) elif "run-caly-model-devi" in key: return key.replace("run-caly-model-devi", "prep-run-explore") return None
[docs] def fold_keys(all_step_keys): folded_keys = {} for key in all_step_keys: is_superop = False for superop in ["prep-run-train", "prep-run-explore", "prep-run-fp"]: if superop in key: if key not in folded_keys: folded_keys[key] = [] is_superop = True break if is_superop: continue superop = get_superop(key) # if its super OP is succeeded, fold it into its super OP if superop is not None and superop in all_step_keys: if superop not in folded_keys: folded_keys[superop] = [] folded_keys[superop].append(key) else: folded_keys[key] = [key] for k, v in folded_keys.items(): if v == []: folded_keys[k] = [k] return folded_keys
[docs] def get_resubmit_keys( wf, ): all_step_keys = successful_step_keys(wf) all_step_keys = matched_step_key( all_step_keys, [ "prep-run-train", "prep-train", "run-train", "modify-train-script", "prep-caly-input", "collect-run-calypso", "prep-run-dp-optim", "run-caly-model-devi", "prep-run-explore", "prep-lmp", "run-lmp", "select-confs", "prep-run-fp", "prep-fp", "run-fp", "collect-data", "scheduler", "id", ], ) all_step_keys = sort_slice_ops( all_step_keys, ["run-train", "run-lmp", "run-fp"], ) folded_keys = fold_keys(all_step_keys) return folded_keys
[docs] def resubmit_concurrent_learning( wf_config, wfid, list_steps=False, reuse=None, replace_scheduler=False, fold=False, ): wf_config = normalize_args(wf_config) global_config_workflow(wf_config) old_wf = Workflow(id=wfid) folded_keys = get_resubmit_keys(old_wf) all_step_keys = sum(folded_keys.values(), []) if list_steps: prt_str = print_keys_in_nice_format( all_step_keys, ["run-train", "run-lmp", "run-fp"], ) print(prt_str) if reuse is None: return None reuse_idx = expand_idx(reuse) reused_keys = [all_step_keys[ii] for ii in reuse_idx] if fold: reused_folded_keys = {} for key in reused_keys: superop = get_superop(key) if superop is not None: if superop not in reused_folded_keys: reused_folded_keys[superop] = [] reused_folded_keys[superop].append(key) else: reused_folded_keys[key] = [key] for k, v in reused_folded_keys.items(): # reuse the super OP iif all steps within it are reused if v != [k] and k in folded_keys and set(v) == set(folded_keys[k]): reused_folded_keys[k] = [k] reused_keys = sum(reused_folded_keys.values(), []) reuse_step = old_wf.query_step(key=reused_keys) wf = submit_concurrent_learning( wf_config, reuse_step=reuse_step, replace_scheduler=replace_scheduler, ) return wf