Operators

There are two types of OPs in DPGEN2

  • OP. An execution unit the the workflow. It can be roughly viewed as a piece of Python script taking some input and gives some outputs. An OP cannot be used in the dflow until it is embedded in a super-OP.

  • Super-OP. An execution unite that is composed by one or more OP and/or super-OPs.

Techinically, OP is a Python class derived from dflow.python.OP. It serves as the PythonOPTemplate of dflow.Step.

The super-OP is a Python class derived from dflow.Steps. It contains dflow.Steps as building blocks, and can be used as OP template to generate a dflow.Step. The explanation of the concepts dflow.Step and dflow.Steps, one may refer to the manual of dflow.

The super-OP PrepRunDPTrain

In the following we will take the PrepRunDPTrain super-OP as an example to illustrate how to write OPs in DPGEN2.

PrepRunDPTrain is a super-OP that prepares several DeePMD-kit training tasks, and submit all of them. This super-OP is composed by two dflow.Steps building from dflow.python.OPs PrepDPTrain and RunDPTrain.

from dflow import (
    Step,
    Steps,
)
from dflow.python import(
    PythonOPTemplate,
    OP,
    Slices,
)

class PrepRunDPTrain(Steps):
    def __init__(
            self,
            name : str,
            prep_train_op : OP,
            run_train_op : OP,
            prep_train_image : str = "dflow:v1.0",
            run_train_image : str = "dflow:v1.0",
    ):
		...
        self = _prep_run_dp_train(
            self, 
            self.step_keys,
            prep_train_op,
            run_train_op,
            prep_train_image = prep_train_image,
            run_train_image = run_train_image,
        )            

The construction of the PrepRunDPTrain takes prepare-training OP and run-training OP and their docker images as input, and implemented in internal method _prep_run_dp_train.

def _prep_run_dp_train(
        train_steps,
        step_keys,
        prep_train_op : OP = PrepDPTrain,
        run_train_op : OP = RunDPTrain,
        prep_train_image : str = "dflow:v1.0",
        run_train_image : str = "dflow:v1.0",
):
    prep_train = Step(
        ...
        template=PythonOPTemplate(
            prep_train_op,
            image=prep_train_image,
            ...
        ),
        ...
    )
    train_steps.add(prep_train)

    run_train = Step(
        ...
        template=PythonOPTemplate(
            run_train_op,
            image=run_train_image,
            ...
        ),
        ...
    )
    train_steps.add(run_train)

    train_steps.outputs.artifacts["scripts"]._from = run_train.outputs.artifacts["script"]
    train_steps.outputs.artifacts["models"]._from = run_train.outputs.artifacts["model"]
    train_steps.outputs.artifacts["logs"]._from = run_train.outputs.artifacts["log"]
    train_steps.outputs.artifacts["lcurves"]._from = run_train.outputs.artifacts["lcurve"]

    return train_steps	

In _prep_run_dp_train, two instances of dflow.Step, i.e. prep_train and run_train, generated from prep_train_op and run_train_op, respectively, are added to train_steps. Both of prep_train_op and run_train_op are OPs (python classes derived from dflow.python.OPs) that will be illustrated later. train_steps is an instance of dflow.Steps. The outputs of the second OP run_train are assigned to the outputs of the train_steps.

The prep_train prepares a list of paths, each of which contains all necessary files to start a DeePMD-kit training tasks.

The run_train slices the list of paths, and assign each item in the list to a DeePMD-kit task. The task is executed by run_train_op. This is a very nice feature of dflow, because the developer only needs to implement how one DeePMD-kit task is executed, and then all the items in the task list will be executed in parallel. See the following code to see how it works

    run_train = Step(
        'run-train',
        template=PythonOPTemplate(
            run_train_op,
            image=run_train_image,
            slices = Slices(
                "int('{{item}}')",
                input_parameter = ["task_name"],
                input_artifact = ["task_path", "init_model"],
                output_artifact = ["model", "lcurve", "log", "script"],
            ),
        ),
        parameters={
            "config" : train_steps.inputs.parameters["train_config"],
            "task_name" : prep_train.outputs.parameters["task_names"],
        },
        artifacts={
            'task_path' : prep_train.outputs.artifacts['task_paths'],
            "init_model" : train_steps.inputs.artifacts['init_models'],
            "init_data": train_steps.inputs.artifacts['init_data'],
            "iter_data": train_steps.inputs.artifacts['iter_data'],
        },
        with_sequence=argo_sequence(argo_len(prep_train.outputs.parameters["task_names"]), format=train_index_pattern),
        key = step_keys['run-train'],
    )

The input parameter "task_names" and artifacts "task_paths" and "init_model" are sliced and supplied to each DeePMD-kit task. The output artifacts of the tasks ("model", "lcurve", "log" and "script") are stacked in the same order as the input lists. These lists are assigned as the outputs of train_steps by

    train_steps.outputs.artifacts["scripts"]._from = run_train.outputs.artifacts["script"]
    train_steps.outputs.artifacts["models"]._from = run_train.outputs.artifacts["model"]
    train_steps.outputs.artifacts["logs"]._from = run_train.outputs.artifacts["log"]
    train_steps.outputs.artifacts["lcurves"]._from = run_train.outputs.artifacts["lcurve"]

The OP RunDPTrain

We will take RunDPTrain as an example to illustrate how to implement an OP in DPGEN2. The source code of this OP is found here

Firstly of all, an OP should be implemented as a derived class of dflow.python.OP.

The dflow.python.OP requires static type define for the input and output variables, i.e. the signatures of an OP. The input and output signatures of the dflow.python.OP are given by classmethods get_input_sign and get_output_sign.

from dflow.python import (
    OP,
    OPIO,
    OPIOSign,
    Artifact,
)
class RunDPTrain(OP):
    @classmethod
    def get_input_sign(cls):
        return OPIOSign({
            "config" : dict,
            "task_name" : str,
            "task_path" : Artifact(Path),
            "init_model" : Artifact(Path),
            "init_data" : Artifact(List[Path]),
            "iter_data" : Artifact(List[Path]),
        })
    
    @classmethod
    def get_output_sign(cls):
        return OPIOSign({
            "script" : Artifact(Path),
            "model" : Artifact(Path),
            "lcurve" : Artifact(Path),
            "log" : Artifact(Path),
        })

All items not defined as Artifact are treated as parameters of the OP. The concept of parameter and artifact are explained in the dflow document. To be short, the artifacts can be pathlib.Path or a list of pathlib.Path. The artifacts are passed by the file system. Other data structures are treated as parameters, they are passed as variables encoded in str. Therefore, a large amout of information should be stored in artifacts, otherwise they can be considered as parameters.

The operation of the OP is implemented in method execute, and are run in docker containers. Again taking the execute method of RunDPTrain as an example

    @OP.exec_sign_check
    def execute(
            self,
            ip : OPIO,
    ) -> OPIO:
        ...
        task_name = ip['task_name']
        task_path = ip['task_path']
        init_model = ip['init_model']
        init_data = ip['init_data']
        iter_data = ip['iter_data']
        ...
        work_dir = Path(task_name)
        ...
        # here copy all files in task_path to work_dir
        ...
        with set_directory(work_dir):
            fplog = open('train.log', 'w')
            def clean_before_quit():
                fplog.close()
            # train model
            command = ['dp', 'train', train_script_name]
            ret, out, err = run_command(command)
            if ret != 0:
                clean_before_quit()
                raise FatalError('dp train failed')
            fplog.write(out)
            # freeze model
            ret, out, err = run_command(['dp', 'freeze', '-o', 'frozen_model.pb'])
            if ret != 0:
                clean_before_quit()
                raise FatalError('dp freeze failed')
            fplog.write(out)
            clean_before_quit()

        return OPIO({
            "script" : work_dir / train_script_name,
            "model" : work_dir / "frozen_model.pb",
            "lcurve" : work_dir / "lcurve.out",
            "log" : work_dir / "train.log",
        })

The inputs and outputs variables are recorded in data structure dflow.python.OPIO, which is initialized by a Python dict. The keys in the input/output dict, and the types of the input/output variables will be checked against their signatures by decorator OP.exec_sign_check. If any key or type does not match, an exception will be raised.

It is noted that all input artifacts of the OP are read-only, therefore, the first step of the RunDPTrain.execute is to copy all necessary input files from the directory task_path prepared by PrepDPTrain to the working directory work_dir.

with_directory method creates the work_dir and swithes to the directory before the execution, and then exits the directoy when the task finishes or an error is raised.

In what follows, the training and model frozen bash commands are executed consecutively. The return code is check and a FatalError is raised if a non-zero code is detected.

Finally the trained model file, input script, learning curve file and the log file are recored in a dflow.python.OPIO and returned.