假设这种情况:
我们分析数据,使用手头的任何工具训练一些机器学习模型,并保存这些模型。这是在Python中完成的,使用Apache Spark python shell和API。我们知道 Apache Spark 擅长批处理,因此是 aboce 场景的不错选择。
现在投入生产,对于每个给定的请求,我们需要返回一个响应,该响应也取决于训练模型的输出。我认为,这就是人们所说的流处理,通常建议使用 Apache Flink。但是,您将如何在 Flink 管道中使用 Python 中可用的工具训练的相同模型呢?
Spark 的微批处理模式在这里不起作用,因为我们确实需要响应每个请求,而不是批量响应。
我也看到一些库试图在 Flink 中进行机器学习,但这并不能满足那些在 Python 而不是 Scala 中拥有各种工具的人的需求,甚至不熟悉 Scala。
所以问题是,人们如何处理这个问题?
这个问题是相关的,但不是重复的,因为作者在那里明确提到使用 Spark MLlib。该库在JVM上运行,并且更有可能移植到其他基于JVM的平台。但这里的问题是,如果让人们说他们正在使用scikit-learn
,或GPy
或他们使用的任何其他方法/包,人们将如何处理它。
我需要一种方法来为 mlPipeline
创建自定义Transformer
,并将该自定义对象与管道的其余部分一起保存/加载。这促使我深入研究spark
模型序列化/反序列化的非常丑陋的深度。简而言之,看起来所有spark
ml模型都有两个组件metadata
和model data
其中模型数据是.fit()
期间学习的参数。元数据保存在模型 save dir 下名为metadata
的目录中,据我所知json
,所以这应该不是问题。模型参数本身似乎只是作为保存目录中的parquet
文件保存。这是保存 LDA 模型的实现
override protected def saveImpl(path: String): Unit = {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val oldModel = instance.oldLocalModel
val data = Data(instance.vocabSize, oldModel.topicsMatrix, oldModel.docConcentration,
oldModel.topicConcentration, oldModel.gammaShape)
val dataPath = new Path(path, "data").toString
sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
}
请注意最后一行的sqlContext.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
。所以好消息是你可以将文件加载到你的Web服务器中,如果服务器在Java/Scala中,你只需要将spark jars保留在类路径中。
但是,如果您将python
用于Web服务器,则可以使用python的镶木地板库(https://github.com/jcrobak/parquet-python),坏消息是拼花文件中的部分或全部对象将是二进制Java转储,因此您实际上无法在python中读取它们。我想到了几个选项,使用Jython
(meh),使用Py4J加载对象,这是pyspark用来与JVM通信的,所以这实际上可以工作。不过,我不希望这完全简单明了。
或者从链接的问题中使用jpmml-spark
并希望最好的。
看看 MLeap。
我们在将Spark上学到的模型外部化为单独的服务方面取得了一些成功,这些服务提供对新传入数据的预测。我们将LDA主题建模管道外部化,尽管是在Scala中。但他们确实有python支持,所以值得一看。