我有一个数组作为广播变量,它包含整数:
broadcast_array.value
Array(72159153, 72159163, 72159202, 72159203, 72159238, 72159398, 72159447, 72159448, 72159455, 72159492...
我在数据集中有一个列(呼叫是col_id
,其中包含IntegerType
值可能在broadcast_array
中,但它们可能不在。
我只是尝试创建一个新列(称为new_col
),该列检查是否在broadcast_array
中检查col_id
值是否为CC_5值。如果是这样,新的列值应为 Available
,否则可以是 null
所以我有类似的东西:
val my_new_df = df.withColumn("new_col", when(broadcast_array.value.contains($"col_id"), "Available"))
但我一直遇到此错误:
Name: Unknown Error
Message: <console>:45: error: type mismatch;
found : Boolean
required: org.apache.spark.sql.Column
val my_new_df = df.withColumn("new_col", when(broadcast_array.value.contains($"col_id"), "Available"))
^
StackTrace:
对我来说最令人困惑的是,我认为when
语句需要一个有条件输出一些布尔值的条件,但是在这里说它需要一列。
我应该如何根据是否可以在预定义数组中找到现有列中的值添加一个值?
如果您查看 when
函数的 api
def何时(条件:org.apache.spark.sql.column,value:scala.any):org.apache.spark.sql.column
很明显所需的条件是列,而不是布尔值。
因此,您可以进行复杂的lit
组合,将boolean
转换为
import org.apache.spark.sql.functions._
df.withColumn("new_col", when(lit(broadcast_array.value.mkString(",")).contains($"col_id"), lit("Available"))).show(false)
或
您可以通过编写简单的udf
功能为
import org.apache.spark.sql.functions._
val broadcastContains = udf((id: Int) => broadcast_array.value.contains(id))
,然后将功能称为
df.withColumn("new_col", when(broadcastContains($"col_id"), lit("Available"))).show(false)
我在Spark-daria中添加了broadcastArrayContains
功能,使Ramesh的解决方案更加可重复/可访问。
def broadcastArrayContains[T](col: Column, broadcastedArray: Broadcast[Array[T]]) = {
when(col.isNull, null)
.when(lit(broadcastedArray.value.mkString(",")).contains(col), lit(true))
.otherwise(lit(false))
}
假设您有以下数据框(df
):
+----+
| num|
+----+
| 123|
| hi|
|null|
+----+
您可以标识广播数组中的所有值:
val specialNumbers = spark.sparkContext.broadcast(Array("123", "456"))
df.withColumn(
"is_special_number",
functions.broadcastArrayContains[String](col("num"), specialNumbers)
)