结合文件



我是Scala的新手。我有两个RDD,我需要将培训和测试数据分开。在一个文件中,我只有所有数据,而在另一个文件中只有测试数据。我需要从完整的数据集中删除测试数据。

完整的数据文件是格式(用户ID,MOVID,评级,时间戳):

res8: Array[String] = Array(1, 31, 2.5, 1260759144)

测试数据文件是格式(userId,movid):

res10: Array[String] = Array(1, 1172)

我如何生成与测试数据集不匹配的CAE的Ratings_train我正在使用以下功能,但返回的列表显示为空:

  def create_training(data: RDD[String], ratings_test: RDD[String]): ListBuffer[Array[String]] = {
val ratings_split = dropheader(data).map(line => line.split(","))
val ratings_testing = dropheader(ratings_test).map(line => line.split(",")).collect()
var ratings_train = new ListBuffer[Array[String]]()
ratings_split.foreach(x => {
  ratings_testing.foreach(y => {
    if (x(0) != y(0) || x(1) != y(1)) {
      ratings_train += x
    }
  })
})
return ratings_train

}

编辑:更改了代码,但会陷入内存问题。

这可能会起作用。

def create_training(data: RDD[String], ratings_test: RDD[String]): Array[Array[String]] = {
  val ratings_split = dropheader(data).map(line => line.split(","))
  val ratings_testing = dropheader(ratings_test).map(line => line.split(","))
  ratings_split.filter(x => {
    ratings_testing.exists(y =>
      (x(0) == y(0) && x(1) == y(1))
    ) == false
  })
}
  1. 您发布的代码片段在逻辑上不正确。如果一行在测试数据中没有存在时,则仅是最终数据的一部分。但是在代码中,如果行与任何测试数据不匹配,则选择了该行。但是我们应该检查它是否与>所有测试数据>不匹配,然后只有我们才能确定它是否是有效的行。
  2. 您正在使用RDD,但现在探索它们的全部功能。我想您正在阅读CSV文件的输入。然后,您可以在RDD中构造数据,无需根据逗号角色吐出字符串,而是将其作为行手动处理。您可以查看SPARK的数据帧API。这些链接可能会有所帮助:https://www.tutorialspoint.com/spark_sql/spark_sql_sql_dataframes.htm,http://spark.org/docs.org/docs/latest/latest/sql-proginsming-programpl-programming-guide-guide-guide.html#dataasets-and-dataaper-and-dataafremest-dataafremest-dataaframes很多li>

使用REGEX:

  def main(args: Array[String]): Unit = {
    // creating test data set
    val data = spark.sparkContext.parallelize(Seq(
      //      "userID, MovID, Rating, Timestamp",
      "1, 31, 2.5, 1260759144",
      "2, 31, 2.5, 1260759144"))
    val ratings_test = spark.sparkContext.parallelize(Seq(
      //      "userID, MovID",
      "1, 31",
      "2, 30",
      "30, 2"
    ))
    val result = getData(data, ratings_test).collect()
    // the result will only contain "2, 31, 2.5, 1260759144"
  }
  def getData(data: RDD[String], ratings_test: RDD[String]): RDD[String] = {
    val ratings = dropheader(data)
    val ratings_testing = dropheader(ratings_test)
    // Broadcasting the test rating data to all spark nodes, since we are collecting this before hand.
    // The reason we are collecting the test data is to avoid call collect in the filter logic
    val ratings_testing_bc = spark.sparkContext.broadcast(ratings_testing.collect.toSet)
    ratings.filter(rating => {
      ratings_testing_bc.value.exists(testRating => regexMatch(rating, testRating)) == false
    })
  }
  def regexMatch(data: String, testData: String): Boolean = {
    // Regular expression to find first two columns
    val regex = """^([^,]*), ([^,rn]*),?""".r
    val (dataCol1, dataCol2) = regex findFirstIn data match {
      case Some(regex(col1, col2)) => (col1, col2)
    }
    val (testDataCol1, testDataCol2) = regex findFirstIn testData match {
      case Some(regex(col1, col2)) => (col1, col2)
    }
    (dataCol1 == testDataCol1) && (dataCol2 == testDataCol2)
  }

最新更新