Source code for dpgen2.flow.dpgen_loop

import os
import pickle
from copy import (
    deepcopy,
)
from pathlib import (
    Path,
)
from typing import (
    List,
    Optional,
    Union,
)

import jsonpickle
from dflow import (
    InputArtifact,
    InputParameter,
    Inputs,
    OPTemplate,
    OutputArtifact,
    OutputParameter,
    Outputs,
    Step,
    Steps,
    Workflow,
    argo_len,
    argo_range,
    argo_sequence,
    download_artifact,
    if_expression,
    upload_artifact,
)
from dflow.python import (
    OP,
    OPIO,
    Artifact,
    BigParameter,
    HDF5Datasets,
    OPIOSign,
    PythonOPTemplate,
    Slices,
)

from dpgen2.exploration.report import (
    ExplorationReport,
)
from dpgen2.exploration.scheduler import (
    ExplorationScheduler,
)
from dpgen2.exploration.selector import (
    ConfSelector,
)
from dpgen2.exploration.task import (
    ExplorationTaskGroup,
)
from dpgen2.superop.block import (
    ConcurrentLearningBlock,
)
from dpgen2.utils import (
    dump_object_to_file,
    load_object_from_file,
)
from dpgen2.utils.step_config import (
    init_executor,
)
from dpgen2.utils.step_config import normalize as normalize_step_dict

cl_default_optional_parameter = {
    "data_mixed_type": False,
    "finetune_mode": "no",
}


[docs] def make_block_optional_parameter(cl_optional_parameter): return { "data_mixed_type": cl_optional_parameter["data_mixed_type"], "finetune_mode": cl_optional_parameter["finetune_mode"], }
[docs] def make_next_optional_parameter(optional_parameter): return { "data_mixed_type": optional_parameter["data_mixed_type"], "finetune_mode": "no", # not to do finetune for `next` loop }
[docs] class SchedulerWrapper(OP):
[docs] @classmethod def get_input_sign(cls): return OPIOSign( { "exploration_scheduler": BigParameter(ExplorationScheduler), "exploration_report": BigParameter(ExplorationReport), "trajs": Artifact(Union[List[Path], HDF5Datasets]), } )
[docs] @classmethod def get_output_sign(cls): return OPIOSign( { "converged": bool, "exploration_scheduler": BigParameter(ExplorationScheduler), "expl_task_grp": BigParameter(ExplorationTaskGroup), "conf_selector": ConfSelector, } )
[docs] @OP.exec_sign_check def execute( self, ip: OPIO, ) -> OPIO: scheduler = ip["exploration_scheduler"] report = ip["exploration_report"] trajs = ip["trajs"] conv, expl_task_grp, selector = scheduler.plan_next_iteration(report, trajs) return OPIO( { "converged": conv, "exploration_scheduler": scheduler, "expl_task_grp": expl_task_grp, "conf_selector": selector, } )
[docs] class MakeBlockId(OP):
[docs] @classmethod def get_input_sign(cls): return OPIOSign( { "exploration_scheduler": BigParameter(ExplorationScheduler), } )
[docs] @classmethod def get_output_sign(cls): return OPIOSign( { "block_id": str, } )
[docs] @OP.exec_sign_check def execute( self, ip: OPIO, ) -> OPIO: scheduler = ip["exploration_scheduler"] stage = scheduler.get_stage() iteration = scheduler.get_iteration() return OPIO( { "block_id": f"iter-{iteration:06d}", } )
[docs] class ConcurrentLearningLoop(Steps): def __init__( self, name: str, block_op: ConcurrentLearningBlock, step_config: dict = normalize_step_dict({}), upload_python_packages: Optional[List[os.PathLike]] = None, ): self._input_parameters = { "block_id": InputParameter(), "type_map": InputParameter(), "numb_models": InputParameter(type=int), "template_script": InputParameter(), "train_config": InputParameter(), "explore_config": InputParameter(), "conf_selector": InputParameter(), "fp_config": InputParameter(), "exploration_scheduler": InputParameter(), "optional_parameter": InputParameter(type=dict), "expl_task_grp": InputParameter(), } self._input_artifacts = { "init_models": InputArtifact(optional=True), "init_data": InputArtifact(), "iter_data": InputArtifact(), } self._output_parameters = { "exploration_scheduler": OutputParameter(), } self._output_artifacts = { "models": OutputArtifact(), "iter_data": OutputArtifact(), } super().__init__( name=name, inputs=Inputs( parameters=self._input_parameters, artifacts=self._input_artifacts, ), outputs=Outputs( parameters=self._output_parameters, artifacts=self._output_artifacts, ), ) self._my_keys = ["block", "scheduler", "id"] self._keys = self._my_keys[:1] + block_op.keys + self._my_keys[1:3] self.step_keys = {} for ii in self._my_keys: self.step_keys[ii] = "--".join( ["%s" % self.inputs.parameters["block_id"], ii] ) self = _loop( self, self.step_keys, name, block_op, step_config=step_config, upload_python_packages=upload_python_packages, ) @property def input_parameters(self): return self._input_parameters @property def input_artifacts(self): return self._input_artifacts @property def output_parameters(self): return self._output_parameters @property def output_artifacts(self): return self._output_artifacts @property def keys(self): return self._keys
[docs] class ConcurrentLearning(Steps): def __init__( self, name: str, block_op: ConcurrentLearningBlock, step_config: dict = normalize_step_dict({}), upload_python_packages: Optional[List[os.PathLike]] = None, ): self.loop = ConcurrentLearningLoop( name + "-loop", block_op, step_config=step_config, upload_python_packages=upload_python_packages, ) self._input_parameters = { "type_map": InputParameter(), "numb_models": InputParameter(type=int), "template_script": InputParameter(), "train_config": InputParameter(), "explore_config": InputParameter(), "fp_config": InputParameter(), "exploration_scheduler": InputParameter(), "optional_parameter": InputParameter( type=dict, value=cl_default_optional_parameter ), } self._input_artifacts = { "init_models": InputArtifact(optional=True), "init_data": InputArtifact(), "iter_data": InputArtifact(), } self._output_parameters = { "exploration_scheduler": OutputParameter(), } self._output_artifacts = { "models": OutputArtifact(), "iter_data": OutputArtifact(), } super().__init__( name=name, inputs=Inputs( parameters=self._input_parameters, artifacts=self._input_artifacts, ), outputs=Outputs( parameters=self._output_parameters, artifacts=self._output_artifacts, ), ) self._init_keys = ["scheduler", "id"] self.loop_key = "loop" self.step_keys = {} for ii in self._init_keys: self.step_keys[ii] = "--".join(["init", ii]) self = _dpgen( self, self.step_keys, name, self.loop, self.loop_key, step_config=step_config, upload_python_packages=upload_python_packages, ) @property def input_parameters(self): return self._input_parameters @property def input_artifacts(self): return self._input_artifacts @property def output_parameters(self): return self._output_parameters @property def output_artifacts(self): return self._output_artifacts @property def init_keys(self): return self._init_keys @property def loop_keys(self): return [self.loop_key] + self.loop.keys
def _loop( steps, step_keys, name: str, block_op: OPTemplate, step_config: dict = normalize_step_dict({}), upload_python_packages: Optional[List[os.PathLike]] = None, ): step_config = deepcopy(step_config) step_template_config = step_config.pop("template_config") step_executor = init_executor(step_config.pop("executor")) block_optional_parameter = make_block_optional_parameter( steps.inputs.parameters["optional_parameter"] ) block_common_parameters = { "block_id": steps.inputs.parameters["block_id"], "type_map": steps.inputs.parameters["type_map"], "numb_models": steps.inputs.parameters["numb_models"], "template_script": steps.inputs.parameters["template_script"], "train_config": steps.inputs.parameters["train_config"], "conf_selector": steps.inputs.parameters["conf_selector"], "fp_config": steps.inputs.parameters["fp_config"], "optional_parameter": block_optional_parameter, "explore_config": steps.inputs.parameters["explore_config"], "expl_task_grp": steps.inputs.parameters["expl_task_grp"], } block_step = Step( name=name + "-block", template=block_op, parameters=block_common_parameters, artifacts={ "init_models": steps.inputs.artifacts["init_models"], "init_data": steps.inputs.artifacts["init_data"], "iter_data": steps.inputs.artifacts["iter_data"], }, key=step_keys["block"], ) steps.add(block_step) scheduler_step = Step( name=name + "-scheduler", template=PythonOPTemplate( SchedulerWrapper, python_packages=upload_python_packages, **step_template_config, ), parameters={ "exploration_report": block_step.outputs.parameters["exploration_report"], "exploration_scheduler": steps.inputs.parameters["exploration_scheduler"], }, artifacts={ "trajs": block_step.outputs.artifacts["trajs"], }, key=step_keys["scheduler"], executor=step_executor, **step_config, ) scheduler_step.template.outputs.parameters[ "exploration_scheduler" ].global_name = "exploration_scheduler" steps.add(scheduler_step) id_step = Step( name=name + "-make-block-id", template=PythonOPTemplate( MakeBlockId, python_packages=upload_python_packages, **step_template_config, ), parameters={ "exploration_scheduler": scheduler_step.outputs.parameters[ "exploration_scheduler" ], }, artifacts={}, key=step_keys["id"], executor=step_executor, **step_config, ) steps.add(id_step) next_common_parameters = { "block_id": id_step.outputs.parameters["block_id"], "type_map": steps.inputs.parameters["type_map"], "numb_models": steps.inputs.parameters["numb_models"], "template_script": steps.inputs.parameters["template_script"], "train_config": steps.inputs.parameters["train_config"], "explore_config": steps.inputs.parameters["explore_config"], "conf_selector": scheduler_step.outputs.parameters["conf_selector"], "fp_config": steps.inputs.parameters["fp_config"], "exploration_scheduler": scheduler_step.outputs.parameters[ "exploration_scheduler" ], "optional_parameter": make_next_optional_parameter( steps.inputs.parameters["optional_parameter"] ), "expl_task_grp": scheduler_step.outputs.parameters["expl_task_grp"], } next_step = Step( name=name + "-next", template=steps, parameters=next_common_parameters, artifacts={ "init_models": block_step.outputs.artifacts["models"], "init_data": steps.inputs.artifacts["init_data"], "iter_data": block_step.outputs.artifacts["iter_data"], }, when="%s == false" % (scheduler_step.outputs.parameters["converged"]), ) steps.add(next_step) steps.outputs.parameters[ "exploration_scheduler" ].value_from_expression = if_expression( _if=(scheduler_step.outputs.parameters["converged"] == True), _then=scheduler_step.outputs.parameters["exploration_scheduler"], _else=next_step.outputs.parameters["exploration_scheduler"], ) steps.outputs.artifacts["models"].from_expression = if_expression( _if=(scheduler_step.outputs.parameters["converged"] == True), _then=block_step.outputs.artifacts["models"], _else=next_step.outputs.artifacts["models"], ) steps.outputs.artifacts["iter_data"].from_expression = if_expression( _if=(scheduler_step.outputs.parameters["converged"] == True), _then=block_step.outputs.artifacts["iter_data"], _else=next_step.outputs.artifacts["iter_data"], ) return steps def _dpgen( steps, step_keys, name, loop_op, loop_key, step_config: dict = normalize_step_dict({}), upload_python_packages: Optional[List[os.PathLike]] = None, ): step_config = deepcopy(step_config) step_template_config = step_config.pop("template_config") step_executor = init_executor(step_config.pop("executor")) scheduler_step = Step( name=name + "-scheduler", template=PythonOPTemplate( SchedulerWrapper, python_packages=upload_python_packages, **step_template_config, ), parameters={ "exploration_report": None, "exploration_scheduler": steps.inputs.parameters["exploration_scheduler"], }, artifacts={ "trajs": None, }, key=step_keys["scheduler"], executor=step_executor, **step_config, ) scheduler_step.template.outputs.parameters[ "exploration_scheduler" ].global_name = "exploration_scheduler" steps.add(scheduler_step) id_step = Step( name=name + "-make-block-id", template=PythonOPTemplate( MakeBlockId, python_packages=upload_python_packages, **step_template_config, ), parameters={ "exploration_scheduler": scheduler_step.outputs.parameters[ "exploration_scheduler" ], }, artifacts={}, key=step_keys["id"], executor=step_executor, **step_config, ) steps.add(id_step) common_parameters = { "block_id": id_step.outputs.parameters["block_id"], "type_map": steps.inputs.parameters["type_map"], "numb_models": steps.inputs.parameters["numb_models"], "template_script": steps.inputs.parameters["template_script"], "train_config": steps.inputs.parameters["train_config"], "explore_config": steps.inputs.parameters["explore_config"], "conf_selector": scheduler_step.outputs.parameters["conf_selector"], "fp_config": steps.inputs.parameters["fp_config"], "exploration_scheduler": scheduler_step.outputs.parameters[ "exploration_scheduler" ], "optional_parameter": steps.inputs.parameters["optional_parameter"], "expl_task_grp": scheduler_step.outputs.parameters["expl_task_grp"], } loop_step = Step( name=name + "-loop", template=loop_op, parameters=common_parameters, artifacts={ "init_models": steps.inputs.artifacts["init_models"], "init_data": steps.inputs.artifacts["init_data"], "iter_data": steps.inputs.artifacts["iter_data"], }, key="--".join(["%s" % id_step.outputs.parameters["block_id"], loop_key]), ) steps.add(loop_step) steps.outputs.parameters[ "exploration_scheduler" ].value_from_parameter = loop_step.outputs.parameters["exploration_scheduler"] steps.outputs.artifacts["models"]._from = loop_step.outputs.artifacts["models"] steps.outputs.artifacts["iter_data"]._from = loop_step.outputs.artifacts[ "iter_data" ] return steps