apache mahout相似性分析,用于CCO抛出负面评估



呼叫apache mahout的相似性分析CCO时,我对NegativeArraysizeexception有一个致命的例外。

我正在运行的代码看起来像这样:

val result = SimilarityAnalysis.cooccurrencesIDSs(myIndexedDataSet:Array[IndexedDataset],
      randomSeed = 1234,
      maxInterestingItemsPerThing = 3,
      maxNumInteractions = 4)

我看到以下错误和相应的堆栈跟踪:

17/04/19 20:49:09 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 20)
java.lang.NegativeArraySizeException
    at org.apache.mahout.math.DenseVector.<init>(DenseVector.java:57)
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:73)
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:72)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/04/19 20:49:09 ERROR Executor: Exception in task 1.0 in stage 11.0 (TID 21)
java.lang.NegativeArraySizeException
    at org.apache.mahout.math.DenseVector.<init>(DenseVector.java:57)
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:73)
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:72)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/04/19 20:49:09 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 20, localhost): java.lang.NegativeArraySizeException
    at org.apache.mahout.math.DenseVector.<init>(DenseVector.java:57)
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:73)
    at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:72)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我正在使用Apache Mahout版本0.13.0

这总是表示输入矩阵之一是空的。阵列中有几个矩阵?每个行的行数是多少?有一个用于索引DataSetspark的伴随对象,该对象在Scala中提供了一个称为apply的构造函数,该对象采用RDD[String, String],因此,如果您可以将数据输入RDD,则只需使用它构造indexedDataSetspark。在这里,字符串成对是用户ID,用于某些事件(例如购买(的项目ID。

请参阅此处的伴侣对象:https://github.com/apache/mahout/mahout/blob/master/spark/src/main/scala/scala/scala/scala/scal/org/apache/mahout/sparkbindings/sparkbindings/indexeddataset/indexeddataset/indexeddatasetsetpark.scala#l75 p>稍作搜索将找到代码,将CSV转换为带有一行代码左右的RDD [String,String]。看起来像这样:

val rawPurchaseInteractions = sc.textFile("/path/in/hdfs").map { line =>
  (line.split(",")(0), (line.split(",")(1))
}

尽管这将两次拆分,但它预计将在带有user-id,item-id的文本文件中的逗号界限列表,用于某种类型的互动,例如"购买"。如果文件中还有其他字段,只需拆分即可获取用户ID和item-ID。map函数中的行返回一对字符串,因此所得的RDD为正确的类型,即RDD[String, String]。将其传递给索引datasetspark,并使用:

val purchasesRdd = IndexedDatasetSpark(rawPurchaseInteractions)(sc)

其中SC是您的火花上下文。这应该给您一个非空的IndexedDatasetSpark,您可以通过查看包裹的BiDictionary S的大小或通过在包裹的Mahout DRM上调用方法来检查。

顺便说一句,这是CSV没有标题。这是文本限制的,不是完整的规格CSV。使用Spark中的其他方法您可以读取真实的CSV,但可能没有必要。

这个问题实际上与Mahout无关,只有较早的行:

inputRDD.filter(_ (1) == primaryFilter).map(o => (o(0), o(2)))

范围已经关闭,我有1到3,而不是0到2。我认为它的位置在Mahout之内,但事实证明这是真正的问题。

相关内容

最新更新