使用数据框激发 UDF



我正在使用Spark 1.3。我有一个数据集,其中列(ordering_date列)中的日期采用 yyyy/MM/dd 格式。我想用日期做一些计算,因此我想使用 jodatime 进行一些转换/格式化。这是我拥有的 udf:

 val return_date = udf((str: String, dtf: DateTimeFormatter) => dtf.formatted(str))

下面是调用 udf 的代码。但是,我收到错误说"不适用"。我需要注册此 UDF 还是我在这里遗漏了什么?

val user_with_dates_formatted = users.withColumn(
  "formatted_date",
  return_date(users("ordering_date"), DateTimeFormat.forPattern("yyyy/MM/dd")
)

我不相信你可以将DateTimeFormatter作为UDF的参数传递。您只能通过Column.一种解决方案是执行以下操作:

val return_date = udf((str: String, format: String) => {
  DateTimeFormat.forPatten(format).formatted(str))
})

然后:

val user_with_dates_formatted = users.withColumn(
  "formatted_date",
  return_date(users("ordering_date"), lit("yyyy/MM/dd"))
)

老实说,这和你的原始算法都有同样的问题。它们都对每条记录使用 forPattern 来解析yyyy/MM/dd。最好是创建一个包裹在Map[String,DateTimeFormatter]上的单例对象,也许像这样(完全未经测试,但你明白了):

object DateFormatters {
  var formatters = Map[String,DateTimeFormatter]()
  def getFormatter(format: String) : DateTimeFormatter = {
    if (formatters.get(format).isEmpty) {
      formatters = formatters + (format -> DateTimeFormat.forPattern(format))
    }
    formatters.get(format).get
  }
}

然后,您可以将UDF更改为:

val return_date = udf((str: String, format: String) => {
  DateFormatters.getFormatter(format).formatted(str))
})

这样,每个执行程序的每种格式只调用一次DateTimeFormat.forPattern(...)

关于单例对象解决方案需要注意的一点是,您不能在spark-shell中定义对象 - 您必须将其打包在 JAR 文件中,并使用 --jars 选项来spark-shell是否要在 shell 中使用DateFormatters对象。

相关内容

  • 没有找到相关文章

最新更新