使用地图和过滤器播放Scala读取文本文件



我有一个带有以下格式( id,f1,f2,f3,...,fn)的文本文件:

12345,0,0,1,2,...,3
23456,0,0,1,2,...,0
33333,0,1,1,0,...,0
56789,1,0,0,0,...,4
a_123,0,0,0,6,...,3

我想读取文件(像a_123,0,0,0,6,...,3一样忽略行)以创建RDD[(Long, Vector)。这是我的解决方案:

  def readDataset(path: String, sparkSession: SparkSession): RDD[(ItemId, Vector)] = {
    val sc = sparkSession.sparkContext
    sc.textFile(path)
      .map({ line => val values=line.split(",")
        (
          values(0).toLong,
          //util.Try(values(0).toLong).getOrElse(0L),
          Vectors.dense(values.slice(1, values.length).map {x => x.toDouble }).toSparse
        )})
      .filter(x => x._1 > 0)
  }

但是,无法编译此代码:

[ERROR]  found   : org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.SparseVector)]
[ERROR]  required: org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)]
[ERROR]     (which expands to)  org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)]
[ERROR] Note: (Long, org.apache.spark.ml.linalg.SparseVector) <: (Long, org.apache.spark.ml.linalg.Vector), but class RDD is invariant in type T.
[ERROR] You may wish to define T as +T instead. (SLS 4.5)
[ERROR]       .filter(x => x._1 > 0)
[ERROR]              ^
[ERROR] one error found

但是,如果我删除了. toSparse.filter(x => x._1 > 0),则可以成功编译此代码。

有人知道为什么,我该怎么办?

也有任何更好的方法可以通过忽略非数字ID行读取文件?

如果删除toSparse,则代码成功编译,因为PairRDD的类型是(ItemId, Vector)

org.apache.spark.ml.linalg.Vector类/类型表示您使用Vector.dense生成的密集向量,当您调用toSparse时,它将转换为org.apache.spark.ml.linalg.SparseVector,这不是您的配对期望的类型。

对于过滤非整数ID,我会说您的方法是做到这一点的好方法。

最新更新