我想使用排名指标(MAP@k)优化PySpark Pipeline的超参数。我已经在文档中看到了如何使用评估 (Scala) 中定义的指标,但我需要定义一个自定义评估器类,因为尚未实现MAP@k。所以我需要做这样的事情:
model = Pipeline(stages=[indexer, assembler, scaler, lg])
paramGrid_lg = ParamGridBuilder()
.addGrid(lg.regParam, [0.001, 0.1])
.addGrid(lg.elasticNetParam, [0, 1])
.build()
crossval_lg = CrossValidator(estimator=model,
estimatorParamMaps=paramGrid_lg,
evaluator=MAPkEvaluator(),
numFolds=2)
其中MAPkEvaluator()
是我的自定义评估器。我见过类似的问题,但没有答案。
是否有任何示例或文档可用?有谁知道是否可以在 PySpark 中实现它?我应该实施什么方法?
@jarandaf在第一条评论中回答了这个问题,但为了清楚起见,我写了如何使用随机指标实现一个基本示例:
import random
from pyspark.ml.evaluation import Evaluator
class RandomEvaluator(Evaluator):
def __init__(self, predictionCol="prediction", labelCol="label"):
self.predictionCol = predictionCol
self.labelCol = labelCol
def _evaluate(self, dataset):
"""
Returns a random number.
Implement here the true metric
"""
return random.randint(0,1)
def isLargerBetter(self):
return True
现在以下代码应该可以工作了:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid_lg = ParamGridBuilder()
.addGrid(lg.regParam, [0.01, 0.1])
.addGrid(lg.elasticNetParam, [0, 1])
.build()
crossval_lg = CrossValidator(estimator=model,
estimatorParamMaps=paramGrid_lg,
evaluator= RandomEvaluator(),
numFolds=2)
cvModel = crossval_lg.fit(train_val_data_)
@Amanda很好地回答了这个问题,但让我也告诉你一些要避免的东西。如果您检查Evaluator()
类的帮助:
help(Evaluator())
你将看到那里定义的方法:
isLargerBetter(self)
| Indicates whether the metric returned by :py:meth:`evaluate` should be maximized
| (True, default) or minimized (False).
| A given evaluator may support multiple metrics which may be maximized or minimized.
|
| .. versionadded:: 1.5.0
现在,如果需要最小化指标,则需要将此方法设置为:
def isLargerBetter(self):
return False
当前方法的默认值为True
。
将实际示例添加到@Amanda的明确答案中,以下代码可用于创建自定义Evaulator
,用于计算二元分类任务中的 F1 分数。它可能没有优化(我实际上不知道是否有更有效的方法来实现指标),但它可以完成工作。
import pyspark.sql.functions as F
from pyspark.ml.evaluation import Evaluator
class MyEvaluator(Evaluator):
def __init__(self, predictionCol='prediction', labelCol='label'):
self.predictionCol = predictionCol
self.labelCol = labelCol
def _evaluate(self, dataset):
tp = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 1)).count()
fp = dataset.filter((F.col(self.labelCol) == 0) & (F.col(self.predictionCol) == 1)).count()
fn = dataset.filter((F.col(self.labelCol) == 1) & (F.col(self.predictionCol) == 0)).count()
f1 = (2 * tp) / (2 * tp + fp + fn)
return f1
def isLargerBetter(self):
return True