如何将LIBSVM模型(使用LIBSVM保存)读取到Pyspark中



我有一个我想移植到pyspark的libsvm缩放模型(由SVM尺度生成)。我很自然地尝试了以下内容:

scaler_path = "path to model"
a = MinMaxScaler().load(scaler_path)

但是我丢了一个错误,期待一个元数据目录:

Py4JJavaErrorTraceback (most recent call last)
<ipython-input-22-1942e7522174> in <module>()
----> 1 a = MinMaxScaler().load(scaler_path)
/srv/data/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/util.pyc in load(cls, path)
    226     def load(cls, path):
    227         """Reads an ML instance from the input path, a shortcut of `read().load(path)`."""
--> 228         return cls.read().load(path)
    229 
    230 
/srv/data/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/util.pyc in load(self, path)
    174         if not isinstance(path, basestring):
    175             raise TypeError("path should be a basestring, got type %s" % type(path))
--> 176         java_obj = self._jread.load(path)
    177         if not hasattr(self._clazz, "_from_java"):
    178             raise NotImplementedError("This Java ML type cannot be loaded into Python currently: %r"
/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:
/srv/data/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()
/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(
Py4JJavaError: An error occurred while calling o321.load.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:[filename]/metadata

````

是否有一个简单的工作来加载此功能?LIBSVM模型的格式为

x
0 1
1 -1050 1030
2 0 1
3 0 3
4 0 1
5 0 1

首先,显示的文件不是以libsvm格式。LIBSVM文件的正确格式如下:

<label> <index1>:<value1> <index2>:<value2> ... <indexN>:<valueN>

因此,您的数据准备不正确。

其次,使用MinMaxScaler使用的类方法load(path)从输入路径读取ML实例。

请记住: MinMaxScaler计算数据集的摘要统计信息并产生MinMaxScalerModel。然后,该模型可以单独转换每个功能,以使其在给定的范围内。

例如:

from pyspark.ml.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import MinMaxScaler
df = spark.createDataFrame([(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])) ,(0.0, Vectors.dense([1.01, 2.02, 3.03]))],['label','features'])
df.show(truncate=False)
# +-----+---------------------+
# |label|features             |
# +-----+---------------------+
# |1.1  |(3,[0,2],[1.23,4.56])|
# |0.0  |[1.01,2.02,3.03]     |
# +-----+---------------------+
mmScaler = MinMaxScaler(inputCol="features", outputCol="scaled")
temp_path = "/tmp/spark/"
minMaxScalerPath = temp_path + "min-max-scaler"
mmScaler.save(minMaxScalerPath)   

上面的摘要将保存MinMaxScaler功能变压器,因此可以在类方法加载后加载它。

现在,让我们看看实际发生的事情。类方法save将创建以下文件结构:

/tmp/spark/
└── min-max-scaler
    └── metadata
        ├── part-00000
        └── _SUCCESS

让我们检查该part-0000文件的内容:

$ cat /tmp/spark/min-max-scaler/metadata/part-00000 | python -m json.tool
{
    "class": "org.apache.spark.ml.feature.MinMaxScaler",
    "paramMap": {
        "inputCol": "features",
        "max": 1.0,
        "min": 0.0,
        "outputCol": "scaled"
    },
    "sparkVersion": "2.0.0",
    "timestamp": 1480501003244,
    "uid": "MinMaxScaler_42e68455a929c67ba66f"
}

因此,当您加载变压器时实际上是:

loadedMMScaler = MinMaxScaler.load(minMaxScalerPath)

您实际上是在加载该文件。它不会使用libsvm文件!

现在您可以应用变压器来创建模型并转换DataFrame

model = loadedMMScaler.fit(df)
model.transform(df).show(truncate=False)                                    
# +-----+---------------------+-------------+
# |label|features             |scaled       |
# +-----+---------------------+-------------+
# |1.1  |(3,[0,2],[1.23,4.56])|[1.0,0.0,1.0]|
# |0.0  |[1.01,2.02,3.03]     |[0.0,1.0,0.0]|
# +-----+---------------------+-------------+

现在让我们回到该libsvm文件,让我们创建一些虚拟数据并使用MLUtils

将其保存到LIBSVM格式
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils
data = sc.parallelize([LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))])
MLUtils.saveAsLibSVMFile(data, temp_path + "data")

回到我们的文件结构:

/tmp/spark/
├── data
│   ├── part-00000
│   ├── part-00001
│   ├── part-00002
│   ├── part-00003
│   ├── part-00004
│   ├── part-00005
│   ├── part-00006
│   ├── part-00007
│   └── _SUCCESS
└── min-max-scaler
    └── metadata
        ├── part-00000
        └── _SUCCESS

您现在可以检查以libsvm格式的那些文件的内容:

$ cat /tmp/spark/data/part-0000*
1.1 1:1.23 3:4.56
0.0 1:1.01 2:2.02 3:3.03

现在让我们加载该数据并应用:

loadedData = MLUtils.loadLibSVMFile(sc, temp_path + "data")
loadedDataDF = spark.createDataFrame(loadedData.map(lambda lp : (lp.label, lp.features.asML())), ['label','features'])
loadedDataDF.show(truncate=False)
# +-----+----------------------------+                                            
# |label|features                    |
# +-----+----------------------------+
# |1.1  |(3,[0,2],[1.23,4.56])       |
# |0.0  |(3,[0,1,2],[1.01,2.02,3.03])|
# +-----+----------------------------+

注意将mllib Vectors转换为ML Vectors非常重要。您可以在这里阅读更多有关它的信息。

model.transform(loadedDataDF).show(truncate=False)
# +-----+----------------------------+-------------+
# |label|features                    |scaled       |
# +-----+----------------------------+-------------+
# |1.1  |(3,[0,2],[1.23,4.56])       |[1.0,0.0,1.0]|
# |0.0  |(3,[0,1,2],[1.01,2.02,3.03])|[0.0,1.0,0.0]|
# +-----+----------------------------+-------------+

我希望这可以回答您的问题!

最新更新