Source code for dpgen2.entrypoint.submit

import glob, dpdata, os, pickle
from pathlib import Path
from dflow import (
    InputParameter,
    OutputParameter,
    Inputs,
    InputArtifact,
    Outputs,
    OutputArtifact,
    Workflow,
    Step,
    Steps,
    upload_artifact,
    download_artifact,
    S3Artifact,
    argo_range,
)
from dflow.python import (
    PythonOPTemplate,
    OP,
    OPIO,
    OPIOSign,
    Artifact,
    upload_packages,
    FatalError,
    TransientError,
)

from dpgen2.op import (
    PrepDPTrain,
    RunDPTrain,
    PrepLmp,
    RunLmp,
    PrepVasp,
    RunVasp,
    SelectConfs,
    CollectData,
)
from dpgen2.superop import (
    PrepRunDPTrain,
    PrepRunLmp,
    PrepRunFp,
    ConcurrentLearningBlock,
)
from dpgen2.flow import (
    ConcurrentLearning,
)
from dpgen2.fp import (
    VaspInputs,
)
from dpgen2.exploration.scheduler import (
    ExplorationScheduler,
    ConvergenceCheckStageScheduler,
)
from dpgen2.exploration.task import (
    ExplorationStage,
    ExplorationTask,
    NPTTaskGroup,
)
from dpgen2.exploration.selector import (
    ConfSelectorLammpsFrames,
    TrustLevel,
)
from dpgen2.constants import (
    default_image,
    default_host,
)
from dpgen2.utils import (
    dump_object_to_file,
    load_object_from_file,
    normalize_alloy_conf_dict,
    generate_alloy_conf_file_content,
    dflow_config,
)
from dpgen2.utils.step_config import normalize as normalize_step_dict
from typing import (
    Union, List,
)

default_config = normalize_step_dict(
    {
        "template_config" : {
            "image" : default_image,
        }
    }
)

[docs]def expand_sys_str(root_dir: Union[str, Path]) -> List[str]: root_dir = Path(root_dir) matches = [str(d) for d in root_dir.rglob("*") if (d / "type.raw").is_file()] if (root_dir / "type.raw").is_file(): matches.append(str(root_dir)) return matches
[docs]def make_concurrent_learning_op ( train_style : str = 'dp', explore_style : str = 'lmp', fp_style : str = 'vasp', prep_train_config : str = default_config, run_train_config : str = default_config, prep_explore_config : str = default_config, run_explore_config : str = default_config, prep_fp_config : str = default_config, run_fp_config : str = default_config, select_confs_config : str = default_config, collect_data_config : str = default_config, cl_step_config : str = default_config, upload_python_package : bool = None, ): if train_style == 'dp': prep_run_train_op = PrepRunDPTrain( "prep-run-dp-train", PrepDPTrain, RunDPTrain, prep_config = prep_train_config, run_config = run_train_config, upload_python_package = upload_python_package, ) else: raise RuntimeError(f'unknown train_style {train_style}') if explore_style == 'lmp': prep_run_explore_op = PrepRunLmp( "prep-run-lmp", PrepLmp, RunLmp, prep_config = prep_explore_config, run_config = run_explore_config, upload_python_package = upload_python_package, ) else: raise RuntimeError(f'unknown explore_style {explore_style}') if fp_style == 'vasp': prep_run_fp_op = PrepRunFp( "prep-run-vasp", PrepVasp, RunVasp, prep_config = prep_fp_config, run_config = run_fp_config, upload_python_package = upload_python_package, ) else: raise RuntimeError(f'unknown fp_style {fp_style}') # ConcurrentLearningBlock block_cl_op = ConcurrentLearningBlock( "concurrent-learning-block", prep_run_train_op, prep_run_explore_op, SelectConfs, prep_run_fp_op, CollectData, select_confs_config = select_confs_config, collect_data_config = collect_data_config, upload_python_package = upload_python_package, ) # dpgen dpgen_op = ConcurrentLearning( "concurrent-learning", block_cl_op, upload_python_package = upload_python_package, step_config = cl_step_config, ) return dpgen_op
[docs]def make_conf_list( conf_list, type_map, fmt='vasp/poscar' ): # load confs from files if isinstance(conf_list, list): conf_list_fname = [] for jj in conf_list: confs = sorted(glob.glob(jj)) conf_list_fname = conf_list_fname + confs conf_list = [] for ii in conf_list_fname: ss = dpdata.System(ii, type_map=type_map, fmt=fmt) ss.to('lammps/lmp', 'tmp.lmp') conf_list.append(Path('tmp.lmp').read_text()) if Path('tmp.lmp').is_file(): os.remove('tmp.lmp') # generate alloy confs elif isinstance(conf_list, dict): conf_list['type_map'] = type_map i_dict = normalize_alloy_conf_dict(conf_list) conf_list = generate_alloy_conf_file_content(**i_dict) else: raise RuntimeError('unknown input format of conf_list: ', type(conf_list)) return conf_list
[docs]def make_naive_exploration_scheduler( config, ): # use npt task group model_devi_jobs = config['model_devi_jobs'] sys_configs = config['sys_configs'] mass_map = config['mass_map'] type_map = config['type_map'] numb_models = config['numb_models'] fp_task_max = config['fp_task_max'] conv_accuracy = config['conv_accuracy'] max_numb_iter = config['max_numb_iter'] fatal_at_max = config.get('fatal_at_max', True) scheduler = ExplorationScheduler() for job in model_devi_jobs: # task group tgroup = NPTTaskGroup() ## ignore the expansion of sys_idx # get all file names of md initial configuraitons sys_idx = job['sys_idx'] conf_list = [] for ii in sys_idx: conf_list += make_conf_list(sys_configs[ii], type_map) # add the list to task group n_sample = job.get('n_sample') tgroup.set_conf( conf_list, n_sample=n_sample, ) temps = job['temps'] press = job['press'] trj_freq = job['trj_freq'] nsteps = job['nsteps'] ensemble = job['ensemble'] # add md settings tgroup.set_md( numb_models, mass_map, temps = temps, press = press, ens = ensemble, nsteps = nsteps, ) tasks = tgroup.make_task() # stage stage = ExplorationStage() stage.add_task_group(tasks) # trust level trust_level = TrustLevel( config['model_devi_f_trust_lo'], config['model_devi_f_trust_hi'], ) # selector selector = ConfSelectorLammpsFrames( trust_level, fp_task_max, ) # stage_scheduler stage_scheduler = ConvergenceCheckStageScheduler( stage, selector, conv_accuracy = conv_accuracy, max_numb_iter = max_numb_iter, fatal_at_max = fatal_at_max, ) # scheduler scheduler.add_stage_scheduler(stage_scheduler) return scheduler
[docs]def get_kspacing_kgamma_from_incar( fname, ): with open(fname) as fp: lines = fp.readlines() for ii in lines: if 'KSPACING' in ii: ks = float(ii.split('=')[1]) if 'KGAMMA' in ii: if 'T' in ii.split('=')[1]: kg = True elif 'F' in ii.split('=')[1]: kg = False else: raise RuntimeError(f"invalid kgamma value {ii.split('=')[1]}") return ks, kg
[docs]def workflow_concurrent_learning( config, ): default_config = normalize_step_dict(config.get('default_config', {})) train_style = config['train_style'] explore_style = config['explore_style'] fp_style = config['fp_style'] prep_train_config = normalize_step_dict(config.get('prep_train_config', default_config)) run_train_config = normalize_step_dict(config.get('run_train_config', default_config)) prep_explore_config = normalize_step_dict(config.get('prep_explore_config', default_config)) run_explore_config = normalize_step_dict(config.get('run_explore_config', default_config)) prep_fp_config = normalize_step_dict(config.get('prep_fp_config', default_config)) run_fp_config = normalize_step_dict(config.get('run_fp_config', default_config)) select_confs_config = normalize_step_dict(config.get('select_confs_config', default_config)) collect_data_config = normalize_step_dict(config.get('collect_data_config', default_config)) cl_step_config = normalize_step_dict(config.get('cl_step_config', default_config)) upload_python_package = config.get('upload_python_package', None) init_models_paths = config.get('training_iter0_model_path') concurrent_learning_op = make_concurrent_learning_op( train_style, explore_style, fp_style, prep_train_config = prep_train_config, run_train_config = run_train_config, prep_explore_config = prep_explore_config, run_explore_config = run_explore_config, prep_fp_config = prep_fp_config, run_fp_config = run_fp_config, select_confs_config = select_confs_config, collect_data_config = collect_data_config, cl_step_config = cl_step_config, upload_python_package = upload_python_package, ) scheduler = make_naive_exploration_scheduler(config) type_map = config['type_map'] numb_models = config['numb_models'] template_script = config['default_training_param'] train_config = {} lmp_config = config.get('lmp_config', {}) fp_config = config.get('fp_config', {}) kspacing, kgamma = get_kspacing_kgamma_from_incar(config['fp_incar']) fp_pp_files = config['fp_pp_files'] incar_file = config['fp_incar'] fp_inputs = VaspInputs( kspacing = kspacing, kgamma = kgamma, incar_template_name = incar_file, potcar_names = fp_pp_files, ) init_data = config['init_data_sys'] if isinstance(init_data,str): init_data = expand_sys_str(init_data) init_data = upload_artifact(init_data) iter_data = upload_artifact([]) if init_models_paths is not None: init_models = upload_artifact(init_models_paths) else: init_models = None # here the scheduler is passed as input parameter to the concurrent_learning_op dpgen_step = Step( 'dpgen-step', template = concurrent_learning_op, parameters = { "type_map" : type_map, "numb_models" : numb_models, "template_script" : template_script, "train_config" : train_config, "lmp_config" : lmp_config, "fp_config" : fp_config, 'fp_inputs' : fp_inputs, "exploration_scheduler" : scheduler, }, artifacts = { "init_models" : init_models, "init_data" : init_data, "iter_data" : iter_data, }, ) return dpgen_step
[docs]def wf_global_workflow( wf_config, ): dflow_config_data = wf_config.get('dflow_config', None) dflow_config(dflow_config_data) # lebesgue context from dflow.plugins.lebesgue import LebesgueContext lb_context_config = wf_config.get("lebesgue_context_config", None) if lb_context_config: lebesgue_context = LebesgueContext( **lb_context_config, ) else : lebesgue_context = None return lebesgue_context
[docs]def submit_concurrent_learning( wf_config, reuse_step = None, ): context = wf_global_workflow(wf_config) dpgen_step = workflow_concurrent_learning(wf_config) wf = Workflow(name="dpgen", context=context) wf.add(dpgen_step) wf.submit(reuse_step=reuse_step) return wf
[docs]def expand_idx (in_list) : ret = [] for ii in in_list : if isinstance(ii, int) : ret.append(ii) elif isinstance(ii, str): step_str = ii.split(':') if len(step_str) > 1 : step = int(step_str[1]) else : step = 1 range_str = step_str[0].split('-') if len(range_str) == 2: ret += range(int(range_str[0]), int(range_str[1]), step) elif len(range_str) == 1 : ret += [int(range_str[0])] else: raise RuntimeError('not expected range string', step_str[0]) ret = sorted(list(set(ret))) return ret
[docs]def successful_step_keys(wf): all_step_keys_ = wf.query_keys_of_steps() wf_info = wf.query() all_step_keys = [] for ii in all_step_keys_: if wf_info.get_step(key=ii)[0]['phase'] == 'Succeeded': all_step_keys.append(ii) return all_step_keys
[docs]def resubmit_concurrent_learning( wf_config, wfid, list_steps = False, reuse = None, ): context = wf_global_workflow(wf_config) old_wf = Workflow(id=wfid) all_step_keys = successful_step_keys(old_wf) all_step_keys = sort_slice_ops( all_step_keys, ['run-train', 'run-lmp', 'run-fp'],) if list_steps: prt_str = print_keys_in_nice_format( all_step_keys, ['run-train', 'run-lmp', 'run-fp'],) print(prt_str) if reuse is None: return None reuse_idx = expand_idx(reuse) reuse_step = [] old_wf_info = old_wf.query() for ii in reuse_idx: reuse_step += old_wf_info.get_step(key=all_step_keys[ii]) wf = submit_concurrent_learning( wf_config, context=context, reuse_step=reuse_step, ) return wf