我是Scala的新手。我有两个RDD,我需要将培训和测试数据分开。在一个文件中,我只有所有数据,而在另一个文件中只有测试数据。我需要从完整的数据集中删除测试数据。
完整的数据文件是格式(用户ID,MOVID,评级,时间戳):
res8: Array[String] = Array(1, 31, 2.5, 1260759144)
测试数据文件是格式(userId,movid):
res10: Array[String] = Array(1, 1172)
我如何生成与测试数据集不匹配的CAE的Ratings_train我正在使用以下功能,但返回的列表显示为空:
def create_training(data: RDD[String], ratings_test: RDD[String]): ListBuffer[Array[String]] = {
val ratings_split = dropheader(data).map(line => line.split(","))
val ratings_testing = dropheader(ratings_test).map(line => line.split(",")).collect()
var ratings_train = new ListBuffer[Array[String]]()
ratings_split.foreach(x => {
ratings_testing.foreach(y => {
if (x(0) != y(0) || x(1) != y(1)) {
ratings_train += x
}
})
})
return ratings_train
}
编辑:更改了代码,但会陷入内存问题。
这可能会起作用。
def create_training(data: RDD[String], ratings_test: RDD[String]): Array[Array[String]] = {
val ratings_split = dropheader(data).map(line => line.split(","))
val ratings_testing = dropheader(ratings_test).map(line => line.split(","))
ratings_split.filter(x => {
ratings_testing.exists(y =>
(x(0) == y(0) && x(1) == y(1))
) == false
})
}
- 您发布的代码片段在逻辑上不正确。如果一行在测试数据中没有存在时,则仅是最终数据的一部分。但是在代码中,如果行与任何测试数据不匹配,则选择了该行。但是我们应该检查它是否与>所有测试数据>不匹配,然后只有我们才能确定它是否是有效的行。
- 您正在使用RDD,但现在探索它们的全部功能。我想您正在阅读CSV文件的输入。然后,您可以在RDD中构造数据,无需根据逗号角色吐出字符串,而是将其作为行手动处理。您可以查看SPARK的数据帧API。这些链接可能会有所帮助:https://www.tutorialspoint.com/spark_sql/spark_sql_sql_dataframes.htm,http://spark.org/docs.org/docs/latest/latest/sql-proginsming-programpl-programming-guide-guide-guide.html#dataasets-and-dataaper-and-dataafremest-dataafremest-dataaframes很多li>
使用REGEX:
def main(args: Array[String]): Unit = {
// creating test data set
val data = spark.sparkContext.parallelize(Seq(
// "userID, MovID, Rating, Timestamp",
"1, 31, 2.5, 1260759144",
"2, 31, 2.5, 1260759144"))
val ratings_test = spark.sparkContext.parallelize(Seq(
// "userID, MovID",
"1, 31",
"2, 30",
"30, 2"
))
val result = getData(data, ratings_test).collect()
// the result will only contain "2, 31, 2.5, 1260759144"
}
def getData(data: RDD[String], ratings_test: RDD[String]): RDD[String] = {
val ratings = dropheader(data)
val ratings_testing = dropheader(ratings_test)
// Broadcasting the test rating data to all spark nodes, since we are collecting this before hand.
// The reason we are collecting the test data is to avoid call collect in the filter logic
val ratings_testing_bc = spark.sparkContext.broadcast(ratings_testing.collect.toSet)
ratings.filter(rating => {
ratings_testing_bc.value.exists(testRating => regexMatch(rating, testRating)) == false
})
}
def regexMatch(data: String, testData: String): Boolean = {
// Regular expression to find first two columns
val regex = """^([^,]*), ([^,rn]*),?""".r
val (dataCol1, dataCol2) = regex findFirstIn data match {
case Some(regex(col1, col2)) => (col1, col2)
}
val (testDataCol1, testDataCol2) = regex findFirstIn testData match {
case Some(regex(col1, col2)) => (col1, col2)
}
(dataCol1 == testDataCol1) && (dataCol2 == testDataCol2)
}