Source code for dpgen2.op.collect_data

from dflow.python import OP, OPIO, OPIOSign, Artifact
import os, json, dpdata
from typing import Tuple, List, Set
from pathlib import Path


[docs]class CollectData(OP): """Collect labeled data and add to the iteration dataset. After running FP tasks, the labeled data are scattered in task directories. This OP collect the labeled data in one data directory and add it to the iteration data. The data generated by this iteration will be place in `ip["name"]` subdirectory of the iteration data directory. """
[docs] @classmethod def get_input_sign(cls): return OPIOSign( { "name": str, "type_map": List[str], "labeled_data": Artifact(List[Path]), "iter_data": Artifact(List[Path]), } )
[docs] @classmethod def get_output_sign(cls): return OPIOSign( { "iter_data": Artifact(List[Path]), } )
[docs] @OP.exec_sign_check def execute( self, ip: OPIO, ) -> OPIO: r"""Execute the OP. This OP collect data scattered in directories given by `ip['labeled_data']` in to one `dpdata.Multisystems` and store it in a directory named `name`. This directory is appended to the list `iter_data`. Parameters ---------- ip : dict Input dict with components: - `name`: (`str`) The name of this iteration. The data generated by this iteration will be place in a sub-directory of `name`. - `labeled_data`: (`Artifact(List[Path])`) The paths of labeled data generated by FP tasks of the current iteration. - `iter_data`: (`Artifact(List[Path])`) The data paths previous iterations. Returns ------- Output dict with components: - `iter_data`: (`Artifact(List[Path])`) The data paths of previous and the current iteration data. """ name = ip["name"] type_map = ip["type_map"] labeled_data = ip["labeled_data"] iter_data = ip["iter_data"] ms = dpdata.MultiSystems(type_map=type_map) for ii in labeled_data: ss = dpdata.LabeledSystem(ii, fmt="deepmd/npy") ms.append(ss) # NOTICE: # if ms.get_nframes() == 0, ms.to_deepmd_npy would not make the dir Path(name) Path(name).mkdir() ms.to_deepmd_npy(name) iter_data.append(Path(name)) return OPIO( { "iter_data": iter_data, } )