from dflow import (
InputParameter,
OutputParameter,
Inputs,
InputArtifact,
Outputs,
OutputArtifact,
Workflow,
Step,
Steps,
upload_artifact,
download_artifact,
argo_range,
argo_len,
argo_sequence,
if_expression,
OPTemplate,
)
from dflow.python import(
PythonOPTemplate,
OP,
OPIO,
OPIOSign,
Artifact,
Slices,
BigParameter,
)
import pickle, jsonpickle, os
from typing import (
List,
Optional,
)
from pathlib import Path
from dpgen2.exploration.scheduler import ExplorationScheduler
from dpgen2.exploration.report import ExplorationReport
from dpgen2.exploration.task import ExplorationTaskGroup
from dpgen2.exploration.selector import ConfSelector
from dpgen2.superop.block import ConcurrentLearningBlock
from dpgen2.utils import (
load_object_from_file,
dump_object_to_file,
)
from dpgen2.utils.step_config import normalize as normalize_step_dict
from dpgen2.utils.step_config import init_executor
from copy import deepcopy
[docs]class SchedulerWrapper(OP):
[docs] @classmethod
def get_output_sign(cls):
return OPIOSign({
"converged" : bool,
"exploration_scheduler" : BigParameter(ExplorationScheduler),
"lmp_task_grp" : BigParameter(ExplorationTaskGroup),
"conf_selector" : ConfSelector,
})
[docs] @OP.exec_sign_check
def execute(
self,
ip : OPIO,
) -> OPIO:
scheduler = ip['exploration_scheduler']
report = ip['exploration_report']
trajs = ip['trajs']
conv, lmp_task_grp, selector = scheduler.plan_next_iteration(report, trajs)
return OPIO({
"converged" : conv,
"exploration_scheduler" : scheduler,
"lmp_task_grp" : lmp_task_grp,
"conf_selector" : selector,
})
[docs]class MakeBlockId(OP):
[docs] @classmethod
def get_output_sign(cls):
return OPIOSign({
"block_id" : str,
})
[docs] @OP.exec_sign_check
def execute(
self,
ip : OPIO,
) -> OPIO:
scheduler = ip['exploration_scheduler']
stage = scheduler.get_stage()
iteration = scheduler.get_iteration()
return OPIO({
"block_id" : f'iter-{iteration:06d}',
})
[docs]class ConcurrentLearningLoop(Steps):
def __init__(
self,
name : str,
block_op : OPTemplate,
step_config : dict = normalize_step_dict({}),
upload_python_packages : Optional[List[os.PathLike]] = None,
):
self._input_parameters={
"block_id" : InputParameter(),
"type_map" : InputParameter(),
"numb_models": InputParameter(type=int),
"template_script" : InputParameter(),
"train_config" : InputParameter(),
"lmp_config" : InputParameter(),
"conf_selector" : InputParameter(),
"fp_config" : InputParameter(),
"exploration_scheduler" : InputParameter(),
"lmp_task_grp" : InputParameter(),
}
self._input_artifacts={
"init_models" : InputArtifact(optional=True),
"init_data" : InputArtifact(),
"iter_data" : InputArtifact(),
}
self._output_parameters={
"exploration_scheduler": OutputParameter(),
}
self._output_artifacts={
"models": OutputArtifact(),
"iter_data" : OutputArtifact(),
}
super().__init__(
name = name,
inputs = Inputs(
parameters=self._input_parameters,
artifacts=self._input_artifacts,
),
outputs=Outputs(
parameters=self._output_parameters,
artifacts=self._output_artifacts,
),
)
self._my_keys = ['block', 'scheduler', 'id']
self._keys = \
self._my_keys[:1] + \
block_op.keys + \
self._my_keys[1:3]
self.step_keys = {}
for ii in self._my_keys:
self.step_keys[ii] = '--'.join(
["%s"%self.inputs.parameters["block_id"], ii]
)
self = _loop(
self,
self.step_keys,
name,
block_op,
step_config = step_config,
upload_python_packages = upload_python_packages,
)
@property
def input_parameters(self):
return self._input_parameters
@property
def input_artifacts(self):
return self._input_artifacts
@property
def output_parameters(self):
return self._output_parameters
@property
def output_artifacts(self):
return self._output_artifacts
@property
def keys(self):
return self._keys
[docs]class ConcurrentLearning(Steps):
def __init__(
self,
name : str,
block_op : OPTemplate,
step_config : dict = normalize_step_dict({}),
upload_python_packages : Optional[List[os.PathLike]] = None,
):
self.loop = ConcurrentLearningLoop(
name+'-loop',
block_op,
step_config = step_config,
upload_python_packages = upload_python_packages,
)
self._input_parameters={
"type_map" : InputParameter(),
"numb_models": InputParameter(type=int),
"template_script" : InputParameter(),
"train_config" : InputParameter(),
"lmp_config" : InputParameter(),
"fp_config" : InputParameter(),
"exploration_scheduler" : InputParameter(),
}
self._input_artifacts={
"init_models" : InputArtifact(optional=True),
"init_data" : InputArtifact(),
"iter_data" : InputArtifact(),
}
self._output_parameters={
"exploration_scheduler": OutputParameter(),
}
self._output_artifacts={
"models": OutputArtifact(),
"iter_data" : OutputArtifact(),
}
super().__init__(
name = name,
inputs = Inputs(
parameters=self._input_parameters,
artifacts=self._input_artifacts,
),
outputs=Outputs(
parameters=self._output_parameters,
artifacts=self._output_artifacts,
),
)
self._init_keys = ['scheduler', 'id']
self.loop_key = 'loop'
self.step_keys = {}
for ii in self._init_keys:
self.step_keys[ii] = '--'.join(['init', ii])
self = _dpgen(
self,
self.step_keys,
name,
self.loop,
self.loop_key,
step_config = step_config,
upload_python_packages = upload_python_packages,
)
@property
def input_parameters(self):
return self._input_parameters
@property
def input_artifacts(self):
return self._input_artifacts
@property
def output_parameters(self):
return self._output_parameters
@property
def output_artifacts(self):
return self._output_artifacts
@property
def init_keys(self):
return self._init_keys
@property
def loop_keys(self):
return [self.loop_key] + self.loop.keys
def _loop (
steps,
step_keys,
name : str,
block_op : OPTemplate,
step_config : dict = normalize_step_dict({}),
upload_python_packages : Optional[List[os.PathLike]] = None,
):
step_config = deepcopy(step_config)
step_template_config = step_config.pop('template_config')
step_executor = init_executor(step_config.pop('executor'))
block_step = Step(
name = name + '-block',
template = block_op,
parameters={
"block_id" : steps.inputs.parameters["block_id"],
"type_map" : steps.inputs.parameters["type_map"],
"numb_models" : steps.inputs.parameters["numb_models"],
"template_script" : steps.inputs.parameters["template_script"],
"train_config" : steps.inputs.parameters["train_config"],
"lmp_config" : steps.inputs.parameters["lmp_config"],
"conf_selector" : steps.inputs.parameters["conf_selector"],
"fp_config" : steps.inputs.parameters["fp_config"],
"lmp_task_grp" : steps.inputs.parameters["lmp_task_grp"],
},
artifacts={
"init_models": steps.inputs.artifacts["init_models"],
"init_data": steps.inputs.artifacts["init_data"],
"iter_data": steps.inputs.artifacts["iter_data"],
},
key = step_keys['block'],
)
steps.add(block_step)
scheduler_step = Step(
name = name + '-scheduler',
template=PythonOPTemplate(
SchedulerWrapper,
python_packages = upload_python_packages,
**step_template_config,
),
parameters={
"exploration_report": block_step.outputs.parameters['exploration_report'],
"exploration_scheduler": steps.inputs.parameters['exploration_scheduler'],
},
artifacts={
"trajs" : block_step.outputs.artifacts['trajs'],
},
key = step_keys['scheduler'],
executor = step_executor,
**step_config,
)
steps.add(scheduler_step)
id_step = Step(
name = name + '-make-block-id',
template=PythonOPTemplate(
MakeBlockId,
python_packages = upload_python_packages,
**step_template_config,
),
parameters={
"exploration_scheduler": scheduler_step.outputs.parameters['exploration_scheduler'],
},
artifacts={
},
key = step_keys['id'],
executor = step_executor,
**step_config,
)
steps.add(id_step)
next_step = Step(
name = name+'-next',
template = steps,
parameters={
"block_id" : id_step.outputs.parameters['block_id'],
"type_map" : steps.inputs.parameters["type_map"],
"numb_models" : steps.inputs.parameters["numb_models"],
"template_script" : steps.inputs.parameters["template_script"],
"train_config" : steps.inputs.parameters["train_config"],
"lmp_config" : steps.inputs.parameters["lmp_config"],
"conf_selector" : scheduler_step.outputs.parameters["conf_selector"],
"fp_config" : steps.inputs.parameters["fp_config"],
"exploration_scheduler" : scheduler_step.outputs.parameters["exploration_scheduler"],
"lmp_task_grp" : scheduler_step.outputs.parameters["lmp_task_grp"],
},
artifacts={
"init_models" : block_step.outputs.artifacts['models'],
"init_data" : steps.inputs.artifacts['init_data'],
"iter_data" : block_step.outputs.artifacts['iter_data'],
},
when = "%s == false" % (scheduler_step.outputs.parameters['converged']),
)
steps.add(next_step)
steps.outputs.parameters['exploration_scheduler'].value_from_expression = \
if_expression(
_if = (scheduler_step.outputs.parameters['converged'] == True),
_then = scheduler_step.outputs.parameters['exploration_scheduler'],
_else = next_step.outputs.parameters['exploration_scheduler'],
)
steps.outputs.artifacts['models'].from_expression = \
if_expression(
_if = (scheduler_step.outputs.parameters['converged'] == True),
_then = block_step.outputs.artifacts['models'],
_else = next_step.outputs.artifacts['models'],
)
steps.outputs.artifacts['iter_data'].from_expression = \
if_expression(
_if = (scheduler_step.outputs.parameters['converged'] == True),
_then = block_step.outputs.artifacts['iter_data'],
_else = next_step.outputs.artifacts['iter_data'],
)
return steps
def _dpgen(
steps,
step_keys,
name,
loop_op,
loop_key,
step_config : dict = normalize_step_dict({}),
upload_python_packages : Optional[List[os.PathLike]] = None
):
step_config = deepcopy(step_config)
step_template_config = step_config.pop('template_config')
step_executor = init_executor(step_config.pop('executor'))
scheduler_step = Step(
name = name + '-scheduler',
template=PythonOPTemplate(
SchedulerWrapper,
python_packages = upload_python_packages,
**step_template_config,
),
parameters={
"exploration_report": None,
"exploration_scheduler": steps.inputs.parameters['exploration_scheduler'],
},
artifacts={
"trajs" : None,
},
key = step_keys['scheduler'],
executor = step_executor,
**step_config,
)
steps.add(scheduler_step)
id_step = Step(
name = name + '-make-block-id',
template=PythonOPTemplate(
MakeBlockId,
python_packages = upload_python_packages,
**step_template_config,
),
parameters={
"exploration_scheduler": scheduler_step.outputs.parameters['exploration_scheduler'],
},
artifacts={
},
key = step_keys['id'],
executor = step_executor,
**step_config,
)
steps.add(id_step)
loop_step = Step(
name = name + '-loop',
template = loop_op,
parameters = {
"block_id" : id_step.outputs.parameters['block_id'],
"type_map" : steps.inputs.parameters['type_map'],
"numb_models" : steps.inputs.parameters['numb_models'],
"template_script" : steps.inputs.parameters['template_script'],
"train_config" : steps.inputs.parameters['train_config'],
"conf_selector" : scheduler_step.outputs.parameters['conf_selector'],
"lmp_config" : steps.inputs.parameters['lmp_config'],
"fp_config" : steps.inputs.parameters['fp_config'],
"exploration_scheduler" : scheduler_step.outputs.parameters['exploration_scheduler'],
"lmp_task_grp" : scheduler_step.outputs.parameters['lmp_task_grp'],
},
artifacts={
"init_models": steps.inputs.artifacts["init_models"],
"init_data": steps.inputs.artifacts["init_data"],
"iter_data": steps.inputs.artifacts["iter_data"],
},
key = '--'.join(["%s"%id_step.outputs.parameters['block_id'], loop_key]),
)
steps.add(loop_step)
steps.outputs.parameters["exploration_scheduler"].value_from_parameter = \
loop_step.outputs.parameters["exploration_scheduler"]
steps.outputs.artifacts["models"]._from = \
loop_step.outputs.artifacts["models"]
steps.outputs.artifacts["iter_data"]._from = \
loop_step.outputs.artifacts["iter_data"]
return steps