用程序读取kubeflow管道的输出



我用python命令在Kubeflow上运行管道,比如:

client.create_run_from_pipeline_func(pipeline_function, arguments=params_dict[name], run_name=name)

它在Kubeflow管道上创建了一个作业,我希望能够使用python API访问管道不同步骤的信息。

job.get({step_name}).get_custom_properties({property_name})

我可以通过在Kubeflow中打开运行并查看我感兴趣的管道步骤的自定义属性来做到这一点,但我想自动化这个过程。你知道使用python API是否可以做到这一点吗?

我使用这个类从kubeflow运行中提取参数:

import json
from typing import Dict
from typing import List
from kfp_server_api.models.api_run_detail import ApiRunDetail

class PipelineResult:
DATASET_PATH_NAME = "data-load-features-DATA_SET_PATH"
def __init__(self, run_description: ApiRunDetail):
self._run_description = run_description
@property
def workflow_manifest(self) -> dict:
return json.loads(self._run_description.pipeline_runtime.workflow_manifest)
@property
def status(self) -> str:
return self.workflow_manifest["status"]["phase"]
@property
def params(self) -> List[Dict]:
params_list = []
for k, v in self.workflow_manifest["status"]["nodes"].items():
for params in v.get("inputs", {}).get("parameters", []):
params_list.append({"node_name": k, **params})
return params_list
def get_param(self, param_name: str):
for el in self.params:
if el["name"] == param_name:
return el["value"]
@property
def training_set_path(self):
return self.get_param(self.DATASET_PATH_NAME)
@property
def run_name(self):
return self.workflow_manifest["metadata"]["annotations"]["pipelines.kubeflow.org/run_name"]
def as_dict(self):
return {
"status": self.status,
"training_set_path": self.training_set_path,
"run_name": self.run_name,
}

client = kfp.Client()
api_response = client.list_runs(namespace='...', sort_by='created_at desc', page_size=30)
runs_descriptions = [client.get_run(run.id) for run in api_response.runs]
runs = pd.DataFrame([PipelineResult(el).as_dict() for el in runs_descriptions])

最新更新