我有一个要求,其中我需要从Spark DataFrame中滤除行,其中某个列的值(例如"价格")需要与Scala映射中存在的值匹配。Scala映射的关键是另一列的值(例如" ID")。我的数据帧包含两个列:ID和价格。我需要过滤出所有价格与Scala Map中提到的价格不匹配的所有列。
我的代码类似于:
object obj1{
// This method returns value price for items as per their id
getPrice(id:String):String {
//lookup in a map and return the price
}
}
object Main{
val validIds = Seq[String]("1","2","3","4")
val filteredDf = baseDataframe.where(baseDataframe("id").in(validIDs.map(lit(_)): _*) &&
baseDataframe("price") === (obj1.getPrice(baseDataframe("id").toString())))
// But this line send string "id" to obj1.getPrice() function
// rather than value of id column
}
}
我无法将ID列的值传递给函数obj1.getPrice()。有什么建议如何实现这一目标?
谢谢,
您可以写一个UDF来做到这一点:
val checkPrice(id: String, price: String) = validIds.exists(_ == id) && obj1.getPrice(id) == price
val checkPriceUdf = udf(checkPrice)
baseDataFrame.where(checkPriceUdf($"id", $"price"))
或其他解决方案是将 id-> Price 的Map
转换为数据框架,然后在id
和price
列上使用baseDataFrame
进行内部连接。