我需要使用Spark纠正某些拼写。不幸的是,一种天真的方法,例如
val misspellings3 = misspellings1
.withColumn("A", when('A === "error1", "replacement1").otherwise('A))
.withColumn("A", when('A === "error1", "replacement1").otherwise('A))
.withColumn("B", when(('B === "conditionC") and ('D === condition3), "replacementC").otherwise('B))
不使用Spark如何根据条件添加新列(不面对JaninoruntimeException或OutofMemoryError)?
可以通过
很好地处理简单的情况(前两个示例)val spellingMistakes = Map(
"error1" -> "fix1"
)
val spellingNameCorrection: (String => String) = (t: String) => {
titles.get(t) match {
case Some(tt) => tt // correct spelling
case None => t // keep original
}
}
val spellingUDF = udf(spellingNameCorrection)
val misspellings1 = hiddenSeasonalities
.withColumn("A", spellingUDF('A))
,但我不确定如何处理NICE&可概括的方式。如果这只是拼写&LT的一小部分;50您是否建议在UDF中进行硬编码?
您可以使UDF接收多个列:
val spellingCorrection2= udf((x: String, y: String) => if (x=="conditionC" && y=="conditionD") "replacementC" else x)
val misspellings3 = misspellings1.withColumn("B", spellingCorrection2($"B", $"C")
为了使它更具概括性,您可以使用两个条件的元组的地图到与第一种情况相同的字符串。
如果您想更多地概括它,则可以使用数据集映射。基本上可以使用相关列创建一个案例类,然后将数据框架转换为案例类的数据集。然后使用数据集映射,然后在输入数据上使用模式匹配来生成相关校正并转换回数据框架。这应该更容易编写,但会有性能成本。
现在,我将继续使用以下内容,这似乎效果很好,更易于理解:https://gist.github.com/rchukh/84ac39310b384abedb89c2999999b24b24b9306
>如果SpellingMap是包含正确拼写的地图,而DF是DataFrame。
val df: DataFrame = _
val spellingMap = Map.empty[String, String] //fill it up yourself
val columnsWithSpellingMistakes = List("abc", "def")
像这样写一个UDF
def spellingCorrectionUDF(spellingMap:Map[String, String]) =
udf[(String), Row]((value: Row) =>
{
val cellValue = value.getString(0)
if(spellingMap.contains(cellValue)) spellingMap(cellValue)
else cellValue
})
最后,您可以称其为
val newColumns = df.columns.map{
case columnName =>
if(columnsWithSpellingMistakes.contains(columnName)) spellingCorrectionUDF(spellingMap)(Column(columnName)).as(columnName)
else Column(columnName)
}
df.select(newColumns:_*)