转换Spark数据框列



我正在使用Spark数据框架。我有一个分类变量在我的数据框架与许多级别。我正在尝试对这个变量进行简单的转换——只选择具有大于n个观察值(例如,1000个)的前几个级别。将所有其他级别的人归为"其他"类别。

我对Spark相当陌生,所以我一直在努力实现这个。这是我到目前为止所能做到的:

# Extract all levels having > 1000 observations (df is the dataframe name) 
val levels_count = df.groupBy("Col_name").count.filter("count >10000").sort(desc("count"))
# Extract the level names
val level_names = level_count.select("Col_name").rdd.map(x => x(0)).collect

这给了我一个数组,其中有我想要保留的关卡名称。接下来,我应该定义可以应用于列的变换函数。这就是我被困住的地方。我认为我们需要创建一个用户定义函数。这是我尝试过的:

# Define UDF
val var_transform = udf((x: String) => {
    if (level_names contains x) x
    else "others"
 })
# Apply UDF to the column
val df_new = df.withColumn("Var_new", var_transform($"Col_name"))

然而,当我尝试 df_new.show 时,它抛出一个"任务不可序列化"异常。我做错了什么?还有,是否有更好的方法来做到这一点?

谢谢!

在我看来,对于这样一个简单的转换,这是一个更好的解决方案:坚持使用DataFrame API并信任catalyst和Tungsten进行优化(例如进行广播连接):

val levels_count = df
  .groupBy($"Col_name".as("new_col_name"))
  .count
  .filter("count >10000")
val df_new = df
  .join(levels_count,$"Col_name"===$"new_col_name", joinType="leftOuter")
  .drop("Col_name")
  .withColumn("new_col_name",coalesce($"new_col_name", lit("other")))

最新更新