ApacheFlink-svm对流数据的预测



我正在使用Apache Flink来预测来自Twitter的流。

代码在Scala 中实现

我的问题是,我从数据集API训练的SVM-Model需要一个数据集作为predict((-方法的输入。

我在这里已经看到了一个问题,一位用户说,你需要编写一个自己的MapFunction,它在工作开始时读取模型(参考:使用scala在Flink中进行实时流预测(

但是我不能写/理解这个代码。

即使我在StreamingMapFunction中得到了模型。我仍然需要一个数据集作为参数来预测结果。

我真的希望有人能向我展示/解释这是怎么做到的。

Flink版本:1.9Scala版本:2.11Flink ML:2.11

val strEnv = StreamExecutionEnvironment.getExecutionEnvironment
val env = ExecutionEnvironment.getExecutionEnvironment
//this is my Model including all the terms to calculate the tfidf-values and to create a libsvm
val featureVectorService = new FeatureVectorService
featureVectorService.learnTrainingData(labeledData, false)
//reads the created libsvm
val trainingData: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "...")
val svm = SVM()
.setBlocks(env.getParallelism)
.setIterations(100)
.setRegularization(0.001)
.setStepsize(0.1)
.setSeed(42)
//learning
svm.fit(trainingData)
//this is my twitter stream - text should be predicted later
val streamSource: DataStream[String] = strEnv.addSource(new TwitterSource(params.getProperties))
//the texts i want to transform to tfidf using the service upon and give it the svm to predict
val tweets: DataStream[(String, String)] = streamSource
.flatMap(new SelectEnglishTweetWithCreatedAtFlatMapper)

因此,SVM所属的FlinkML目前不支持流式API。这就是为什么SVM只接受DataSet。这个想法不是使用FlinkML,而是使用scala或java中的一些SVM库。然后您可以读取模型,例如从文件中读取。问题是,你必须自己实现大部分逻辑。

你提到的帖子中的评论或多或少都在说同样的话。

相关内容

  • 没有找到相关文章

最新更新