Source code for dpgen2.superop.block

from dflow import (
    InputParameter,
    OutputParameter,
    Inputs,
    InputArtifact,
    Outputs,
    OutputArtifact,
    Workflow,
    Step,
    Steps,
    upload_artifact,
    download_artifact,
    argo_range,
    argo_len,
    argo_sequence,
    OPTemplate,
)
from dflow.python import(
    PythonOPTemplate,
    OP,
    OPIO,
    OPIOSign,
    Artifact,
    Slices,
)
from dpgen2.utils.step_config import normalize as normalize_step_dict
from dpgen2.utils.step_config import init_executor

import os
from typing import Any, Dict, Optional, Set, List
from pathlib import Path
from copy import deepcopy

[docs]class ConcurrentLearningBlock(Steps): def __init__( self, name : str, prep_run_dp_train_op : OPTemplate, prep_run_lmp_op : OPTemplate, select_confs_op : OP, prep_run_fp_op : OPTemplate, collect_data_op : OP, select_confs_config : dict = normalize_step_dict({}), collect_data_config : dict = normalize_step_dict({}), upload_python_packages : Optional[List[os.PathLike]] = None, ): self._input_parameters={ "block_id" : InputParameter(), "type_map" : InputParameter(), "numb_models": InputParameter(type=int), "template_script" : InputParameter(), "train_config" : InputParameter(), "lmp_config" : InputParameter(), "conf_selector" : InputParameter(), "fp_config" : InputParameter(), "lmp_task_grp" : InputParameter(), } self._input_artifacts={ "init_models" : InputArtifact(optional=True), "init_data" : InputArtifact(), "iter_data" : InputArtifact(), } self._output_parameters={ "exploration_report": OutputParameter(), } self._output_artifacts={ "models": OutputArtifact(), "iter_data" : OutputArtifact(), "trajs" : OutputArtifact(), } super().__init__( name = name, inputs = Inputs( parameters=self._input_parameters, artifacts=self._input_artifacts, ), outputs=Outputs( parameters=self._output_parameters, artifacts=self._output_artifacts, ), ) self._my_keys = ['select-confs', 'collect-data'] self._keys = \ prep_run_dp_train_op.keys + \ prep_run_lmp_op.keys + \ self._my_keys[:1] + \ prep_run_fp_op.keys + \ self._my_keys[1:2] self.step_keys = {} for ii in self._my_keys: self.step_keys[ii] = '--'.join( ["%s"%self.inputs.parameters["block_id"], ii] ) self = _block_cl( self, self.step_keys, name, prep_run_dp_train_op, prep_run_lmp_op, select_confs_op, prep_run_fp_op, collect_data_op, select_confs_config = select_confs_config, collect_data_config = collect_data_config, upload_python_packages = upload_python_packages, ) @property def input_parameters(self): return self._input_parameters @property def input_artifacts(self): return self._input_artifacts @property def output_parameters(self): return self._output_parameters @property def output_artifacts(self): return self._output_artifacts @property def keys(self): return self._keys
def _block_cl( block_steps : Steps, step_keys : Dict[str, Any], name : str, prep_run_dp_train_op : OPTemplate, prep_run_lmp_op : OPTemplate, select_confs_op : OP, prep_run_fp_op : OPTemplate, collect_data_op : OP, select_confs_config : dict = normalize_step_dict({}), collect_data_config : dict = normalize_step_dict({}), upload_python_packages : Optional[List[os.PathLike]] = None, ): select_confs_config = deepcopy(select_confs_config) collect_data_config = deepcopy(collect_data_config) select_confs_template_config = select_confs_config.pop('template_config') collect_data_template_config = collect_data_config.pop('template_config') select_confs_executor = init_executor(select_confs_config.pop('executor')) collect_data_executor = init_executor(collect_data_config.pop('executor')) prep_run_dp_train = Step( name + '-prep-run-dp-train', template = prep_run_dp_train_op, parameters={ "block_id" : block_steps.inputs.parameters['block_id'], "train_config" : block_steps.inputs.parameters['train_config'], "numb_models": block_steps.inputs.parameters['numb_models'], "template_script": block_steps.inputs.parameters['template_script'], }, artifacts={ "init_models" : block_steps.inputs.artifacts['init_models'], "init_data" : block_steps.inputs.artifacts['init_data'], "iter_data" : block_steps.inputs.artifacts['iter_data'], }, key = '--'.join(["%s"%block_steps.inputs.parameters["block_id"], "prep-run-train"]), ) block_steps.add(prep_run_dp_train) prep_run_lmp = Step( name = name + '-prep-run-lmp', template = prep_run_lmp_op, parameters={ "block_id" : block_steps.inputs.parameters['block_id'], "lmp_config": block_steps.inputs.parameters['lmp_config'], "lmp_task_grp": block_steps.inputs.parameters['lmp_task_grp'], }, artifacts={ "models" : prep_run_dp_train.outputs.artifacts['models'], }, key = '--'.join(["%s"%block_steps.inputs.parameters["block_id"], "prep-run-lmp"]), ) block_steps.add(prep_run_lmp) select_confs = Step( name = name + '-select-confs', template=PythonOPTemplate( select_confs_op, output_artifact_archive={ "confs": None }, python_packages = upload_python_packages, **select_confs_template_config, ), parameters={ "conf_selector": block_steps.inputs.parameters['conf_selector'], "type_map": block_steps.inputs.parameters["type_map"], }, artifacts={ "trajs" : prep_run_lmp.outputs.artifacts['trajs'], "model_devis" : prep_run_lmp.outputs.artifacts['model_devis'], }, key = step_keys['select-confs'], executor = select_confs_executor, **select_confs_config, ) block_steps.add(select_confs) prep_run_fp = Step( name = name + '-prep-run-fp', template = prep_run_fp_op, parameters={ "block_id" : block_steps.inputs.parameters['block_id'], "fp_config": block_steps.inputs.parameters['fp_config'], "type_map": block_steps.inputs.parameters["type_map"], }, artifacts={ "confs" : select_confs.outputs.artifacts['confs'], }, key = '--'.join(["%s"%block_steps.inputs.parameters["block_id"], "prep-run-fp"]), ) block_steps.add(prep_run_fp) collect_data = Step( name = name + '-collect-data', template=PythonOPTemplate( collect_data_op, output_artifact_archive={ "iter_data": None }, python_packages = upload_python_packages, **collect_data_template_config, ), parameters={ "name": block_steps.inputs.parameters["block_id"], "type_map": block_steps.inputs.parameters["type_map"], }, artifacts={ "iter_data" : block_steps.inputs.artifacts['iter_data'], "labeled_data" : prep_run_fp.outputs.artifacts['labeled_data'], }, key = step_keys['collect-data'], executor = collect_data_executor, **collect_data_config, ) block_steps.add(collect_data) block_steps.outputs.parameters["exploration_report"].value_from_parameter = \ select_confs.outputs.parameters["report"] block_steps.outputs.artifacts["models"]._from = \ prep_run_dp_train.outputs.artifacts["models"] block_steps.outputs.artifacts["iter_data"]._from = \ collect_data.outputs.artifacts["iter_data"] block_steps.outputs.artifacts["trajs"]._from = \ prep_run_lmp.outputs.artifacts["trajs"] return block_steps