这应该很容易,但。。。。使用Spark 1.6.1。。。。我有DataFrame#1,列为A、B、C。值:
A B C
1 2 A
2 2 A
3 2 B
4 2 C
然后,我创建了一个新的数据帧,其中有一个新列D,所以:
DataFrame df2 = df1.withColumn("D", df1.col("C"));
到目前为止还不错,但我实际上希望D列中的值是有条件的,即:
// pseudo code
if (col C = "A") the col D = "X"
else if (col C = "B") the col D = "Y"
else col D = "Z"
然后,我将删除C列,并将D重命名为C。我尝试过查看column函数,但似乎没有符合要求的内容;我曾想过使用df1.rdd().map()并对行进行迭代,但除了不能真正使其工作之外,我有点认为DataFrames的全部目的是远离rdd抽象?
不幸的是,我不得不用Java来做这件事(当然,Spark with Java并不是最佳的!!)。我似乎错过了显而易见的东西,很高兴在得到解决方案时被证明是个白痴!
我相信您可以使用when
来实现这一点。此外,您可能可以直接替换旧列。对于您的示例,代码将类似于:
import static org.apache.spark.sql.functions.*;
Column newCol = when(col("C").equalTo("A"), "X")
.when(col("C").equalTo("B"), "Y")
.otherwise("Z");
DataFrame df2 = df1.withColumn("C", newCol);
有关when
的更多详细信息,请查看Column
Javadoc。
感谢Daniel,我解决了这个问题:)
缺少的部分是sql函数的静态导入
import static org.apache.spark.sql.functions.*;
我一定尝试了一百万种不同的使用when的方法,但由于没有进行导入,导致编译失败/运行时错误。丹尼尔的回答一针见血!
您也可以使用udf来完成同样的工作。只需编写一个简单的if-then-else结构
import org.apache.spark.sql.functions.udf
val customFunct = udf { d =>
//if then else construct
}
val new_DF= df.withColumn(column_name, customFunct(df("data_column")))