我正在使用Spark 1.3。我有一个数据集,其中列(ordering_date列)中的日期采用 yyyy/MM/dd 格式。我想用日期做一些计算,因此我想使用 jodatime 进行一些转换/格式化。这是我拥有的 udf:
val return_date = udf((str: String, dtf: DateTimeFormatter) => dtf.formatted(str))
下面是调用 udf 的代码。但是,我收到错误说"不适用"。我需要注册此 UDF 还是我在这里遗漏了什么?
val user_with_dates_formatted = users.withColumn(
"formatted_date",
return_date(users("ordering_date"), DateTimeFormat.forPattern("yyyy/MM/dd")
)
我不相信你可以将DateTimeFormatter
作为UDF
的参数传递。您只能通过Column
.一种解决方案是执行以下操作:
val return_date = udf((str: String, format: String) => {
DateTimeFormat.forPatten(format).formatted(str))
})
然后:
val user_with_dates_formatted = users.withColumn(
"formatted_date",
return_date(users("ordering_date"), lit("yyyy/MM/dd"))
)
老实说,这和你的原始算法都有同样的问题。它们都对每条记录使用 forPattern
来解析yyyy/MM/dd
。最好是创建一个包裹在Map[String,DateTimeFormatter]
上的单例对象,也许像这样(完全未经测试,但你明白了):
object DateFormatters {
var formatters = Map[String,DateTimeFormatter]()
def getFormatter(format: String) : DateTimeFormatter = {
if (formatters.get(format).isEmpty) {
formatters = formatters + (format -> DateTimeFormat.forPattern(format))
}
formatters.get(format).get
}
}
然后,您可以将UDF
更改为:
val return_date = udf((str: String, format: String) => {
DateFormatters.getFormatter(format).formatted(str))
})
这样,每个执行程序的每种格式只调用一次DateTimeFormat.forPattern(...)
。
关于单例对象解决方案需要注意的一点是,您不能在spark-shell
中定义对象 - 您必须将其打包在 JAR 文件中,并使用 --jars
选项来spark-shell
是否要在 shell 中使用DateFormatters
对象。