我们如何将变量传递给 Spark 数据帧中的 where 子句



我正在尝试将变量SCD_filter传递给 Spark 中数据帧中的 where 子句,我收到了错误,但直接传递时它工作正常。我这样做是为了根据不同的场景动态传递此过滤器以供将来使用。

val SCD_filter = """currentDF.col("u_business_unit") <=> updatedDF.col("u_business_unit")
|      and(currentDF.col("u_operation_level_2") <=> updatedDF.col("u_operation_level_2"))
|      and(currentDF.col("u_operation_level_3") <=> updatedDF.col("u_operation_level_3"))""".stripMargin

然后我将变量传递给下面的代码:

val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("currentDF.*")
.where(s"$SCD_filter")  /// passing the variable which is causing the error
.show()

收到的错误:

Exception in thread "main" org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: Database 'currentdf' not found;

注意:当前DF很好,因为当变量被删除时代码正在执行,我们将条件传递给变量的where子句。

试试这个 -

val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("*")
.where($"Column1='$data'")  /// passing the variable which is causing the error
.show()

DataFrames创建别名,并使用别名,如字符串中的alias.column_name

val SCD_filter = """
(
(currentDF.u_business_unit <=> updatedDF.u_business_unit) and 
(currentDF.u_operation_level_2 <=> updatedDF.colu_operation_level_2) and 
(currentDF.u_operation_level_3 <=> updatedDF.u_operation_level_3)
)
"""
val common_unchangedata = currentDF.alias("currentDF")
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.select("currentDF.*")
.where(SCD_filter)
.show()

根据您的评论

实际上,此代码将运行 5 个表,所有表将具有不同的列,所有条件都将不同。我也愿意接受更好的方法

我会用这样的东西解决这个问题,


import org.apache.spark.sql.functions._
val cond1 = $"u_business_unit"     <=> $"updatedDF.u_business_unit"
val cond2 = $"u_operation_level_2" <=> $"updatedDF.u_operation_level_2"
val cond3 = $"u_operation_level_3" <=> $"updatedDF.u_operation_level_3"
val SCD_filter = cond1.and(cond2).and(cond3)
val common_unchangedata = currentDF
.join(updatedDF, currentDF.col("Sys_id") === updatedDF.col("Sys_id"), "inner")
.where(SCD_filter) 
.show()

您可以将其扩展为具有一个函数getCondition(tableName:String):Column该函数根据您正在使用的数据类型在运行时构造适当的条件。

最新更新