是否可以使用kubeflow管道输出相同类型的工件列表



我正试图从一个kubeflow管道组件输出多个混淆矩阵,就像这个例子中只输出一个:

def eval_model(
test_set: Input[Dataset],
xgb_model: Input[Model],
metrics: Output[ClassificationMetrics],
smetrics: Output[Metrics]
):
from xgboost import XGBClassifier
import pandas as pd
data = pd.read_csv(test_set.path)
model = XGBClassifier()
model.load_model(xgb_model.path)
score = model.score(
data.drop(columns=["target"]),
data.target,
)
from sklearn.metrics import roc_curve
y_scores =  model.predict_proba(data.drop(columns=["target"]))[:, 1]
fpr, tpr, thresholds = roc_curve(
y_true=data.target.to_numpy(), y_score=y_scores, pos_label=True
)
metrics.log_roc_curve(fpr.tolist(), tpr.tolist(), thresholds.tolist())
from sklearn.metrics import confusion_matrix
y_pred = model.predict(data.drop(columns=["target"]))
metrics.log_confusion_matrix(
["False", "True"],
confusion_matrix(
data.target, y_pred
).tolist(),  # .tolist() to convert np array to list.
)
xgb_model.metadata["test_score"] = float(score)
smetrics.log_metric("score", float(score))

你知道是否可以在不需要定义多个output参数的情况下输出多个混淆矩阵吗?

我只能把函数定义为:

def eval_model(
test_set: Input[Dataset],
xgb_model: Input[Model],
metrics: Output[List[ClassificationMetrics]],
smetrics: Output[Metrics]
):

但这不起作用,因为列表无法调用log_confusion_matrix

在他们的源代码中,我没有发现太多:https://github.com/kubeflow/pipelines/blob/55a2fb5c20011b01945c9867ddff0d39e9db1964/sdk/python/kfp/v2/components/types/artifact_types.py#L255-L256

根据当前的组件规范,一个组件只能输出ClassificationMetrics的单个对象,该对象可以可视化。因此,即使log_confusion_matrix被多次调用,由于先前的调用被覆盖,因此只有最后的混淆矩阵被可视化。

作为您需求的变通方法,我建议您做3件事。

  1. 在管道规范中使用for循环生成多个组件,每个组件输出一个混淆矩阵。例如,每个模型都可以作为一个组件运行,它在各个组件中的混淆矩阵。请参阅以下代码
@dsl.pipeline(
name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.2)
for model in ["xgb_classifier_1.joblib", "xgb_classifier_2.joblib"]:
eval_task = eval_model(model)
  1. 使用kfp.dsl包中的ParallelFor,该包将并行运行同一组件的多个实例。请参阅以下代码
@dsl.pipeline(
name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.2)
with ParallelFor(["xgb_classifier_1.joblib", "xgb_classifier_2.joblib"]) as model:
eval_task = eval_model(model)

我用变通方法1(和2(得到了类似的结果。

  1. 使用本文档中建议的标记或HTML可视化,在单个标记或HTML工件中可视化多个混淆矩阵。下面的组件代码是一个markdown工件,它将3个混淆矩阵制成表格。将列表转换为降价表的代码来自here
@component(
packages_to_install=['sklearn'],
base_image='python:3.9',
)
def iris_sgdclassifier(
test_samples_fraction: float
) -> dict:
from sklearn import datasets, model_selection
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import confusion_matrix
iris_dataset = datasets.load_iris()
train_x, test_x, train_y, test_y = model_selection.train_test_split(
iris_dataset['data'], iris_dataset['target'], test_size=test_samples_fraction)
classifier = SGDClassifier()
confusion_matrices_dictionary = {}
for cv in [3, 5, 7]: ## confusion matrices generated for multiple values of cross-validation splits
classifier.fit(train_x, train_y)
predictions = model_selection.cross_val_predict(classifier, train_x, train_y, cv=cv)
confusion_matrices_dictionary["experiment_cv_"+str(cv)] = {"categories":['Setosa', 'Versicolour', 'Virginica'], "confusion_matrix":confusion_matrix(train_y, predictions).tolist()}
return confusion_matrices_dictionary 
## an element in the dictionary => {'experiment_cv_3': {'categories': ['Setosa', 'Versicolour', 'Virginica'], 
## 'confusion_matrix': [[35, 0, 0], [16, 7, 19], [0, 0, 43]]}}

@component(
packages_to_install=["numpy"],
base_image='python:3.9',
)
def visualize_confusion_matrices(
confusion_matrices_dictionary: dict,
markdown_artifact: Output[Markdown]
):

import numpy as np
for key in confusion_matrices_dictionary.keys():
categories = confusion_matrices_dictionary[key]["categories"]
confusion_matrix = confusion_matrices_dictionary[key]["confusion_matrix"]
matrix_size = len(categories)+1
table_struct = np.empty((matrix_size, matrix_size), dtype=object)
table_struct[0,0] = "Categories"
table_struct[0,1:] = categories
table_struct[1:,0] = categories
table_struct[1:,1:] = confusion_matrix
## code to convert list to markdown table
markdown = "## "+key+"nn" + str("| ")
for e in table_struct[0]:
to_add = " " + str(e) + str(" |")
markdown += to_add
markdown += "n"
markdown += '|'
for i in range(len(table_struct[0])):
markdown += str("-------------- | ")
markdown += "n"
for entry in table_struct[1:]:
markdown += str("| ")
for e in entry:
to_add = str(e) + str(" | ")
markdown += to_add
markdown += "n"
with open(markdown_artifact.path, 'a') as f:
f.write(markdown)

@dsl.pipeline(
name='metrics-visualization-pipeline')
def metrics_visualization_pipeline():
iris_sgdclassifier_op = iris_sgdclassifier(test_samples_fraction=0.2)
cms = visualize_confusion_matrices(iris_sgdclassifier_op.output)

最新更新