from dflow.python import (
OP,
OPIO,
OPIOSign,
Artifact
)
import os, json, dpdata
from typing import Tuple, List, Set
from pathlib import Path
[docs]class CollectData(OP):
"""Collect labeled data and add to the iteration dataset.
After running FP tasks, the labeled data are scattered in task
directories. This OP collect the labeled data in one data
directory and add it to the iteration data. The data generated by
this iteration will be place in `ip["name"]` subdirectory of the
iteration data directory.
"""
[docs] @classmethod
def get_output_sign(cls):
return OPIOSign({
"iter_data" : Artifact(List[Path]),
})
[docs] @OP.exec_sign_check
def execute(
self,
ip : OPIO,
) -> OPIO:
r"""Execute the OP. This OP collect data scattered in directories given by `ip['labeled_data']`
in to one `dpdata.Multisystems` and store it in a directory named `name`. This directory is appended
to the list `iter_data`.
Parameters
----------
ip : dict
Input dict with components:
- `name`: (`str`) The name of this iteration. The data generated by this iteration will be place in a sub-directory of `name`.
- `labeled_data`: (`Artifact(List[Path])`) The paths of labeled data generated by FP tasks of the current iteration.
- `iter_data`: (`Artifact(List[Path])`) The data paths previous iterations.
Returns
-------
Output dict with components:
- `iter_data`: (`Artifact(List[Path])`) The data paths of previous and the current iteration data.
"""
name = ip['name']
type_map = ip['type_map']
labeled_data = ip['labeled_data']
iter_data = ip['iter_data']
ms = dpdata.MultiSystems(type_map=type_map)
for ii in labeled_data:
ss = dpdata.LabeledSystem(ii, fmt='deepmd/npy')
ms.append(ss)
# NOTICE:
# if ms.get_nframes() == 0, ms.to_deepmd_npy would not make the dir Path(name)
Path(name).mkdir()
ms.to_deepmd_npy(name)
iter_data.append(Path(name))
return OPIO({
'iter_data' : iter_data,
})