呼叫apache mahout的相似性分析CCO时,我对NegativeArraysizeexception有一个致命的例外。
我正在运行的代码看起来像这样:
val result = SimilarityAnalysis.cooccurrencesIDSs(myIndexedDataSet:Array[IndexedDataset],
randomSeed = 1234,
maxInterestingItemsPerThing = 3,
maxNumInteractions = 4)
我看到以下错误和相应的堆栈跟踪:
17/04/19 20:49:09 ERROR Executor: Exception in task 0.0 in stage 11.0 (TID 20)
java.lang.NegativeArraySizeException
at org.apache.mahout.math.DenseVector.<init>(DenseVector.java:57)
at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:73)
at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:72)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/04/19 20:49:09 ERROR Executor: Exception in task 1.0 in stage 11.0 (TID 21)
java.lang.NegativeArraySizeException
at org.apache.mahout.math.DenseVector.<init>(DenseVector.java:57)
at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:73)
at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:72)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/04/19 20:49:09 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 20, localhost): java.lang.NegativeArraySizeException
at org.apache.mahout.math.DenseVector.<init>(DenseVector.java:57)
at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:73)
at org.apache.mahout.sparkbindings.SparkEngine$$anonfun$5.apply(SparkEngine.scala:72)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我正在使用Apache Mahout版本0.13.0
这总是表示输入矩阵之一是空的。阵列中有几个矩阵?每个行的行数是多少?有一个用于索引DataSetspark的伴随对象,该对象在Scala中提供了一个称为apply
的构造函数,该对象采用RDD[String, String]
,因此,如果您可以将数据输入RDD,则只需使用它构造indexedDataSetspark。在这里,字符串成对是用户ID,用于某些事件(例如购买(的项目ID。
请参阅此处的伴侣对象:https://github.com/apache/mahout/mahout/blob/master/spark/src/main/scala/scala/scala/scala/scal/org/apache/mahout/sparkbindings/sparkbindings/indexeddataset/indexeddataset/indexeddatasetsetpark.scala#l75 p>稍作搜索将找到代码,将CSV转换为带有一行代码左右的RDD [String,String]。看起来像这样:
val rawPurchaseInteractions = sc.textFile("/path/in/hdfs").map { line =>
(line.split(",")(0), (line.split(",")(1))
}
尽管这将两次拆分,但它预计将在带有user-id,item-id
的文本文件中的逗号界限列表,用于某种类型的互动,例如"购买"。如果文件中还有其他字段,只需拆分即可获取用户ID和item-ID。map
函数中的行返回一对字符串,因此所得的RDD为正确的类型,即RDD[String, String]
。将其传递给索引datasetspark,并使用:
val purchasesRdd = IndexedDatasetSpark(rawPurchaseInteractions)(sc)
其中SC是您的火花上下文。这应该给您一个非空的IndexedDatasetSpark
,您可以通过查看包裹的BiDictionary
S的大小或通过在包裹的Mahout DRM上调用方法来检查。
顺便说一句,这是CSV没有标题。这是文本限制的,不是完整的规格CSV。使用Spark中的其他方法您可以读取真实的CSV,但可能没有必要。
这个问题实际上与Mahout无关,只有较早的行:
inputRDD.filter(_ (1) == primaryFilter).map(o => (o(0), o(2)))
范围已经关闭,我有1到3,而不是0到2。我认为它的位置在Mahout之内,但事实证明这是真正的问题。