在火花数据框架中,如果另一列中的值在广播变量数组中,则将值添加到新列中



我有一个数组作为广播变量,它包含整数:

broadcast_array.value
Array(72159153, 72159163, 72159202, 72159203, 72159238, 72159398, 72159447, 72159448, 72159455, 72159492...

我在数据集中有一个列(呼叫是col_id,其中包含IntegerType值可能在broadcast_array中,但它们可能不在。

我只是尝试创建一个新列(称为new_col),该列检查是否在broadcast_array中检查col_id值是否为CC_5值。如果是这样,新的列值应为 Available,否则可以是 null

所以我有类似的东西:

val my_new_df = df.withColumn("new_col", when(broadcast_array.value.contains($"col_id"), "Available"))

但我一直遇到此错误:

Name: Unknown Error
Message: <console>:45: error: type mismatch;
 found   : Boolean
 required: org.apache.spark.sql.Column
   val my_new_df = df.withColumn("new_col", when(broadcast_array.value.contains($"col_id"), "Available"))
                                                                                           ^
StackTrace: 

对我来说最令人困惑的是,我认为when语句需要一个有条件输出一些布尔值的条件,但是在这里说它需要一列。

我应该如何根据是否可以在预定义数组中找到现有列中的值添加一个值?

如果您查看 when函数的 api

def何时(条件:org.apache.spark.sql.column,value:scala.any):org.apache.spark.sql.column

很明显所需的条件是列,而不是布尔值

因此,您可以进行复杂的lit组合,将boolean转换为

import org.apache.spark.sql.functions._
df.withColumn("new_col", when(lit(broadcast_array.value.mkString(",")).contains($"col_id"), lit("Available"))).show(false)

您可以通过编写简单的udf功能为

来实现自己的尝试
import org.apache.spark.sql.functions._
val broadcastContains = udf((id: Int) => broadcast_array.value.contains(id))

,然后将功能称为

df.withColumn("new_col", when(broadcastContains($"col_id"), lit("Available"))).show(false)

我在Spark-daria中添加了broadcastArrayContains功能,使Ramesh的解决方案更加可重复/可访问。

def broadcastArrayContains[T](col: Column, broadcastedArray: Broadcast[Array[T]]) = {
  when(col.isNull, null)
    .when(lit(broadcastedArray.value.mkString(",")).contains(col), lit(true))
    .otherwise(lit(false))
}

假设您有以下数据框(df):

+----+
| num|
+----+
| 123|
|  hi|
|null| 
+----+

您可以标识广播数组中的所有值:

val specialNumbers = spark.sparkContext.broadcast(Array("123", "456"))
df.withColumn(
  "is_special_number",
  functions.broadcastArrayContains[String](col("num"), specialNumbers)
)

相关内容

  • 没有找到相关文章

最新更新