我正在使用Spark数据框架。我有一个分类变量在我的数据框架与许多级别。我正在尝试对这个变量进行简单的转换——只选择具有大于n个观察值(例如,1000个)的前几个级别。将所有其他级别的人归为"其他"类别。
我对Spark相当陌生,所以我一直在努力实现这个。这是我到目前为止所能做到的:
# Extract all levels having > 1000 observations (df is the dataframe name)
val levels_count = df.groupBy("Col_name").count.filter("count >10000").sort(desc("count"))
# Extract the level names
val level_names = level_count.select("Col_name").rdd.map(x => x(0)).collect
这给了我一个数组,其中有我想要保留的关卡名称。接下来,我应该定义可以应用于列的变换函数。这就是我被困住的地方。我认为我们需要创建一个用户定义函数。这是我尝试过的:
# Define UDF
val var_transform = udf((x: String) => {
if (level_names contains x) x
else "others"
})
# Apply UDF to the column
val df_new = df.withColumn("Var_new", var_transform($"Col_name"))
然而,当我尝试 df_new.show
时,它抛出一个"任务不可序列化"异常。我做错了什么?还有,是否有更好的方法来做到这一点?
谢谢!
在我看来,对于这样一个简单的转换,这是一个更好的解决方案:坚持使用DataFrame API并信任catalyst和Tungsten进行优化(例如进行广播连接):
val levels_count = df
.groupBy($"Col_name".as("new_col_name"))
.count
.filter("count >10000")
val df_new = df
.join(levels_count,$"Col_name"===$"new_col_name", joinType="leftOuter")
.drop("Col_name")
.withColumn("new_col_name",coalesce($"new_col_name", lit("other")))