我正在使用以下命令在 Flink 版本 1.1.3 中尝试 ALS 代码:
mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-scala
-DarchetypeVersion=1.1.3
-DgroupId=org.apache.flink.quickstart
-DartifactId=flink-scala-project
-Dversion=0.1
-Dpackage=org.apache.flink.quickstart
-DinteractiveMode=false
我正在遵循以下示例代码:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/ml/als.html 并更改了数据集中长长的 Int
val env = ExecutionEnvironment.getExecutionEnvironment
val csvInput: DataSet[(Long, Long, Double)] = env.readCsvFile[(Long, Long, Double)]("tmp-contactos.csv")
// Setup the ALS learner
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(100)
// Set the other parameters via a parameter map
val parameters = ParameterMap()
.add(ALS.Lambda, 0.9)
.add(ALS.Seed, 42L)
// Calculate the factorization
als.fit(csvInput, parameters)
但它抛出了符文时间:
Exception in thread "main" java.lang.RuntimeException: There is no FitOperation defined for org.apache.flink.ml.recommendation.ALS which trains on a DataSet[(Long, Int, Double)]
at org.apache.flink.ml.pipeline.Estimator$$anon$4.fit(Estimator.scala:85)
at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
at org.apache.flink.quickstart.BatchJob$.main(BatchJob.scala:119)
at org.apache.flink.quickstart.BatchJob.main(BatchJob.scala)
有可能使用长整型而不是整数??
我搜索并找到了 0.9 版本,但没有找到 1.1.13 版本:
https://issues.apache.org/jira/browse/FLINK-2211
它还没有得到官方支持,但我已经创建了一个分支,我已经修复了这个限制。您可以尝试此分支。我会把它贡献给 Flink,这样它下次应该成为主节点的一部分。