Source code for dpgen2.superop.prep_run_lmp

from dflow import (
    InputParameter,
    OutputParameter,
    Inputs,
    InputArtifact,
    Outputs,
    OutputArtifact,
    Workflow,
    Step,
    Steps,
    upload_artifact,
    download_artifact,
    argo_range,
    argo_len,
    argo_sequence,
)
from dflow.python import(
    PythonOPTemplate,
    OP,
    OPIO,
    OPIOSign,
    Artifact,
    Slices,
)
from dpgen2.constants import (
    lmp_index_pattern,
)
from dpgen2.utils.step_config import normalize as normalize_step_dict
from dpgen2.utils.step_config import init_executor

import os
from typing import Optional, Set, List
from pathlib import Path
from copy import deepcopy

[docs]class PrepRunLmp(Steps): def __init__( self, name : str, prep_op : OP, run_op : OP, prep_config : dict = normalize_step_dict({}), run_config : dict = normalize_step_dict({}), upload_python_packages : Optional[List[os.PathLike]] = None, ): self._input_parameters = { "block_id" : InputParameter(type=str, value=""), "lmp_config" : InputParameter(), "lmp_task_grp": InputParameter(), } self._input_artifacts = { "models" : InputArtifact(), } self._output_parameters={ "task_names": OutputParameter(), } self._output_artifacts={ "logs": OutputArtifact(), "trajs": OutputArtifact(), "model_devis": OutputArtifact(), "plm_output": OutputArtifact(), } super().__init__( name=name, inputs=Inputs( parameters=self._input_parameters, artifacts=self._input_artifacts, ), outputs=Outputs( parameters=self._output_parameters, artifacts=self._output_artifacts, ), ) self._keys = ['prep-lmp', 'run-lmp'] self.step_keys = {} ii = 'prep-lmp' self.step_keys[ii] = '--'.join( ["%s"%self.inputs.parameters["block_id"], ii] ) ii = 'run-lmp' self.step_keys[ii] = '--'.join( ["%s"%self.inputs.parameters["block_id"], ii + "-{{item}}"] ) self = _prep_run_lmp( self, self.step_keys, prep_op, run_op, prep_config = prep_config, run_config = run_config, upload_python_packages = upload_python_packages, ) @property def input_parameters(self): return self._input_parameters @property def input_artifacts(self): return self._input_artifacts @property def output_parameters(self): return self._output_parameters @property def output_artifacts(self): return self._output_artifacts @property def keys(self): return self._keys
def _prep_run_lmp( prep_run_steps, step_keys, prep_op : OP, run_op : OP, prep_config : dict = normalize_step_dict({}), run_config : dict = normalize_step_dict({}), upload_python_packages : Optional[List[os.PathLike]] = None, ): prep_config = deepcopy(prep_config) run_config = deepcopy(run_config) prep_template_config = prep_config.pop('template_config') run_template_config = run_config.pop('template_config') prep_executor = init_executor(prep_config.pop('executor')) run_executor = init_executor(run_config.pop('executor')) prep_lmp = Step( 'prep-lmp', template=PythonOPTemplate( prep_op, output_artifact_archive={ "task_paths": None }, python_packages = upload_python_packages, **prep_template_config, ), parameters={ "lmp_task_grp": prep_run_steps.inputs.parameters['lmp_task_grp'], }, artifacts={ }, key = step_keys['prep-lmp'], executor = prep_executor, **prep_config, ) prep_run_steps.add(prep_lmp) run_lmp = Step( 'run-lmp', template=PythonOPTemplate( run_op, slices = Slices( "int('{{item}}')", input_parameter = ["task_name"], input_artifact = ["task_path"], output_artifact = ["log", "traj", "model_devi", "plm_output"], ), python_packages = upload_python_packages, **run_template_config, ), parameters={ "task_name" : prep_lmp.outputs.parameters["task_names"], "config" : prep_run_steps.inputs.parameters["lmp_config"], }, artifacts={ 'task_path' : prep_lmp.outputs.artifacts['task_paths'], "models" : prep_run_steps.inputs.artifacts['models'], }, with_sequence=argo_sequence(argo_len(prep_lmp.outputs.parameters["task_names"]), format=lmp_index_pattern), # with_param=argo_range(argo_len(prep_lmp.outputs.parameters["task_names"])), key = step_keys['run-lmp'], executor = run_executor, **run_config, ) prep_run_steps.add(run_lmp) prep_run_steps.outputs.parameters["task_names"].value_from_parameter = prep_lmp.outputs.parameters["task_names"] prep_run_steps.outputs.artifacts["logs"]._from = run_lmp.outputs.artifacts["log"] prep_run_steps.outputs.artifacts["trajs"]._from = run_lmp.outputs.artifacts["traj"] prep_run_steps.outputs.artifacts["model_devis"]._from = run_lmp.outputs.artifacts["model_devi"] prep_run_steps.outputs.artifacts["plm_output"]._from = run_lmp.outputs.artifacts["plm_output"] return prep_run_steps