Source code for dpgen2.entrypoint.status
import logging
from typing import (
Dict,
List,
Optional,
Union,
)
from dflow import (
Workflow,
)
from dpgen2.entrypoint.args import normalize as normalize_args
from dpgen2.entrypoint.common import (
global_config_workflow,
)
from dpgen2.utils.dflow_query import (
get_all_schedulers,
get_last_scheduler,
)
[docs]
def status(
workflow_id,
wf_config: Optional[Dict] = {},
):
wf_config = normalize_args(wf_config)
global_config_workflow(wf_config)
wf = Workflow(id=workflow_id)
wf_keys = wf.query_keys_of_steps()
scheduler = get_last_scheduler(wf, wf_keys)
if scheduler is not None:
ptr_str = scheduler.print_convergence()
print(ptr_str)
else:
logging.warn("no scheduler is finished")