在Spark DataFrame中提取列的值



我有一个要求,其中我需要从Spark DataFrame中滤除行,其中某个列的值(例如"价格")需要与Scala映射中存在的值匹配。Scala映射的关键是另一列的值(例如" ID")。我的数据帧包含两个列:ID和价格。我需要过滤出所有价格与Scala Map中提到的价格不匹配的所有列。

我的代码类似于:

object obj1{
  // This method returns value price for items as per their id
  getPrice(id:String):String {
   //lookup in a map and return the price
  }
}
object Main{    
  val validIds = Seq[String]("1","2","3","4")
  val filteredDf = baseDataframe.where(baseDataframe("id").in(validIDs.map(lit(_)): _*) &&
    baseDataframe("price") === (obj1.getPrice(baseDataframe("id").toString()))) 
  // But this line send string "id" to obj1.getPrice() function
  // rather than value of id column
  }
}

我无法将ID列的值传递给函数obj1.getPrice()。有什么建议如何实现这一目标?

谢谢,

您可以写一个UDF来做到这一点:

val checkPrice(id: String, price: String) = validIds.exists(_ == id) && obj1.getPrice(id) == price
val checkPriceUdf = udf(checkPrice)
baseDataFrame.where(checkPriceUdf($"id", $"price"))

或其他解决方案是将 id-> Price Map转换为数据框架,然后在idprice列上使用baseDataFrame进行内部连接。

相关内容

  • 没有找到相关文章

最新更新