我是Spark SQL和DataFrames的新手。我有一个Dataframe
,我应该根据其他列的值添加一个新列。我有一个来自Excel的Nested IF
公式,我应该实现(用于在新列中添加值),当将其转换为程序化术语时,它就是这样:
if(k =='yes')
{
if(!(i==''))
{
if(diff(max_date, target_date) < 0)
{
if(j == '')
{
"pending" //the value of the column
}
else {
"approved" //the value of the column
}
}
else{
"expired" //the value of the column
}
}
else{
"" //the value should be empty
}
}
else{
"" //the value should be empty
}
i,j,k are three other columns in the Dataframe.
我知道我们可以使用withColumn
和when
根据其他列添加新列,但是我不确定如何使用该方法实现上述逻辑。
实现上述逻辑以添加新列的简便/有效方法是什么?任何帮助将不胜感激。
谢谢。
第一件事,让我们简化该语句:
if(k == "yes" && i.nonEmpty)
if(maxDate - targetDate < 0)
if (j.isEmpty) "pending"
else "approved"
else "expired"
else ""
现在有2种主要方法可以完成此
- 使用自定义UDF
- 使用Spark内置功能:
coalesce
,when
,otherwise
自定义UDF
现在由于您的条件的复杂性,编号2是很棘手的。使用自定义UDF应适合您的需求。
def getState(i: String, j: String, k: String, maxDate: Long, targetDate: Long): String =
if(k == "yes" && i.nonEmpty)
if(maxDate - targetDate < 0)
if (j.isEmpty) "pending"
else "approved"
else "expired"
else ""
val stateUdf = udf(getState _)
df.withColumn("state", stateUdf($"i",$"j",$"k",lit(0),lit(0)))
只需更改LIT(0),然后将(0)更改为您的日期代码,这应该对您有用。
使用Spark内置功能
如果您注意到性能问题,则可以切换到使用coalesce
,otherwise
和when
,它们看起来像这样:
val isApproved = df.withColumn("state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" =!= "", "approved").otherwise(null))
val isPending = isApproved.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) < 0) && $"j" === "", "pending").otherwise(null)))
val isExpired = isPending.withColumn("state", coalesce($"state", when($"k" === "yes" && $"i" =!= "" && (lit(max_date) - lit(target_date) >= 0), "expired").otherwise(null)))
val finalDf = isExpired.withColumn("state", coalesce($"state", lit("")))
过去,我已经使用自定义UDF,没有问题的大量输入来源,并且自定义UDF可以导致更可读的代码,尤其是在这种情况下。