PySpark 中的自定义评估器



我想使用排名指标(MAP@k)优化PySpark Pipeline的超参数。我已经在文档中看到了如何使用评估 (Scala) 中定义的指标,但我需要定义一个自定义评估器类,因为尚未实现MAP@k。所以我需要做这样的事情:

model = Pipeline(stages=[indexer, assembler, scaler, lg])
paramGrid_lg = ParamGridBuilder() 
.addGrid(lg.regParam, [0.001, 0.1]) 
.addGrid(lg.elasticNetParam, [0, 1]) 
.build()
crossval_lg = CrossValidator(estimator=model,
estimatorParamMaps=paramGrid_lg,
evaluator=MAPkEvaluator(), 
numFolds=2)

其中MAPkEvaluator()是我的自定义评估器。我见过类似的问题,但没有答案。

是否有任何示例或文档可用?有谁知道是否可以在 PySpark 中实现它?我应该实施什么方法?

@jarandaf在第一条评论中回答了这个问题,但为了清楚起见,我写了如何使用随机指标实现一个基本示例:

import random
from pyspark.ml.evaluation import Evaluator
class RandomEvaluator(Evaluator):
def __init__(self, predictionCol="prediction", labelCol="label"):
self.predictionCol = predictionCol
self.labelCol = labelCol
def _evaluate(self, dataset):
"""
Returns a random number. 
Implement here the true metric
"""
return random.randint(0,1)
def isLargerBetter(self):
return True

现在以下代码应该可以工作了:

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid_lg = ParamGridBuilder() 
.addGrid(lg.regParam, [0.01, 0.1]) 
.addGrid(lg.elasticNetParam, [0, 1]) 
.build()
crossval_lg = CrossValidator(estimator=model,
estimatorParamMaps=paramGrid_lg,
evaluator= RandomEvaluator(), 
numFolds=2)
cvModel = crossval_lg.fit(train_val_data_)

@Amanda很好地回答了这个问题,但让我也告诉你一些要避免的东西。如果您检查Evaluator()类的帮助:

help(Evaluator())

你将看到那里定义的方法:

isLargerBetter(self)
|      Indicates whether the metric returned by :py:meth:`evaluate` should be maximized
|      (True, default) or minimized (False).
|      A given evaluator may support multiple metrics which may be maximized or minimized.
|      
|      .. versionadded:: 1.5.0

现在,如果需要最小化指标,则需要将此方法设置为:

def isLargerBetter(self):
return False

当前方法的默认值为True

将实际示例添加到@Amanda的明确答案中,以下代码可用于创建自定义Evaulator,用于计算二元分类任务中的 F1 分数。它可能没有优化(我实际上不知道是否有更有效的方法来实现指标),但它可以完成工作。

import pyspark.sql.functions as F
from pyspark.ml.evaluation import Evaluator
class MyEvaluator(Evaluator):
def __init__(self, predictionCol='prediction', labelCol='label'):
self.predictionCol = predictionCol
self.labelCol = labelCol
def _evaluate(self, dataset):
tp = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 1)).count()
fp = dataset.filter((F.col(self.labelCol) == 0) & (F.col(self.predictionCol) == 1)).count()
fn = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 0)).count()
f1 = (2 * tp) / (2 * tp + fp + fn)
return f1
def isLargerBetter(self):
return True

最新更新