Spark:在数据帧的同一列中分析具有不同格式(MM-dd-yyyy HH:MM,MM/dd/yy H:MM)的日期/时



问题是:我有一个数据集,其中一列具有2种或更多类型的日期格式。通常,我会将所有值选择为String类型,然后使用to_date来解析日期。但我不知道如何解析具有两种或两种以上日期格式的列。

val DF= Seq(("02-04-2020 08:02"),("03-04-2020 10:02"),("04-04-2020 09:00"),("04/13/19 9:12"),("04/14/19 2:13"),("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")
import org.apache.spark.sql.functions.{to_date, to_timestamp}
val DOBDF = DF.withColumn("Date", to_date($"DOB", "MM/dd/yyyy"))

上述命令的输出:

null
null
null
0019-04-13
0019-04-14
0019-04-15
0019-04-16

我写的上面的代码不适用于MM/dd/yyyy格式,而我正在获得null作为输出。

因此,寻求帮助来解析具有不同日期格式的文件。如果可能的话,也请分享一些关于处理日期格式的教程或笔记。请注意:我使用Scala作为spark框架。

提前谢谢。

检查EDIT部分,以便在该解决方案的后续部分((中使用Column函数而不是UDF来提高性能

好吧,让我们试试看。。尝试对每种格式进行列转换,并保持成功值。您可能需要从外部提供所有可能的格式作为参数,或者在代码本身的某个位置保留所有可能格式的主列表。。

以下是可能的解决方案。。(我使用了新的库java.time.format.DateTimeFormatter,而不是SimpleDateFormatter,后者有时会在时间戳超过毫秒时出现问题(

创建一个to_timestamp函数,它接受要转换为时间戳的字符串和所有可能的格式

import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import java.time.format.DateTimeFormatter
import scala.util.Try
def toTimestamp(date: String, tsformats: Seq[String]): Option[java.sql.Timestamp] = {
val out = (for (tsft <- tsformats) yield {
val formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern(tsft).toFormatter()
if (Try(java.sql.Timestamp.valueOf(LocalDateTime.parse(date, formatter))).isSuccess)
Option(java.sql.Timestamp.valueOf(LocalDateTime.parse(date, formatter)))
else None
}).filter(_.isDefined)
if (out.isEmpty) None else out.head
}

在上面创建一个UDF-(这个UDF以格式字符串的顺序为参数(

def UtoTimestamp(tsformats: Seq[String]) = org.apache.spark.sql.functions.udf((date: String) => toTimestamp(date, tsformats))

现在,只需在你的火花代码中使用它。。这是你的数据测试-

val DF = Seq(("02-04-2020 08:02"), ("03-04-2020 10:02"), ("04-04-2020 09:00"), ("04/13/19 9:12"), ("04/14/19 2:13"), ("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")
val tsformats = Seq("MM-dd-yyyy HH:mm", "MM/dd/yy H:mm")
DF.select(UtoTimestamp(tsformats)('DOB)).show

这是输出-

+-------------------+
|           UDF(DOB)|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+

Cherry是为了避免为数据帧中的许多列编写UtoTimestamp(colname(。让我们写一个函数,它接受一个数据帧,所有时间戳列的列表,以及源数据可能编码时间戳的所有可能格式。

它会为您解析所有的时间戳列,并尝试使用不同的格式。。

def WithTimestampParsed(df: DataFrame, tsCols: Seq[String], tsformats: Seq[String]): DataFrame = {
val colSelector = df.columns.map {
c =>
{
if (tsCols.contains(c)) UtoTimestamp(tsformats)(col(c)) alias (c)
else col(c)
}
}

像这样使用

// You can pass as many column names in a sequence to be parsed
WithTimestampParsed(DF, Seq("DOB"), tsformats).show

输出-

+-------------------+
|                DOB|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+

编辑-我看到了最新的spark代码,他们现在也在使用java.time_utils来解析日期和时间戳,从而可以处理超过毫秒的数据。。早些时候,这些函数是基于SimpleDateFormat的(由于这个限制,我以前不依赖spark的to_timestamp(。

所以对于to_date&to_timestamp函数现在非常可靠。。让我们使用它们,而不必编写UDF。。让我们编写一个对Columns进行操作的函数。

def to_timestamp_simple(col: org.apache.spark.sql.Column, formats: Seq[String]): org.apache.spark.sql.Column = {
coalesce(formats.map(fmt => to_timestamp(col, fmt)): _*)
}

有了这个WithTimestampParsed看起来应该像-

def WithTimestampParsedSimple(df: DataFrame, tsCols: Seq[String], tsformats: Seq[String]): DataFrame = {
val colSelector = df.columns.map {
c =>
{
if (tsCols.contains(c)) to_timestamp_simple(col(c), tsformats) alias (c)
else col(c)
}
}
df.select(colSelector: _*)
}

像一样使用

DF.select(to_timestamp_simple('DOB,tsformats)).show
//OR
WithTimestampParsedSimple(DF, Seq("DOB"), tsformats).show

输出看起来像-

+---------------------------------------------------------------------------------------+
|coalesce(to_timestamp(`DOB`, 'MM-dd-yyyy HH:mm'), to_timestamp(`DOB`, 'MM/dd/yy H:mm'))|
+---------------------------------------------------------------------------------------+
|                                                                    2020-02-04 08:02:00|
|                                                                    2020-03-04 10:02:00|
|                                                                    2020-04-04 09:00:00|
|                                                                    2019-04-13 09:12:00|
|                                                                    2019-04-14 02:13:00|
|                                                                    2019-04-15 10:14:00|
|                                                                    2019-04-16 05:15:00|
+---------------------------------------------------------------------------------------+
+-------------------+
|                DOB|
+-------------------+
|2020-02-04 08:02:00|
|2020-03-04 10:02:00|
|2020-04-04 09:00:00|
|2019-04-13 09:12:00|
|2019-04-14 02:13:00|
|2019-04-15 10:14:00|
|2019-04-16 05:15:00|
+-------------------+

我放了一些代码,也许可以在某种程度上帮助您。我试过这个

mport org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import java.sql.Date
import java.util.{GregorianCalendar}

object DateFormats {
val spark = SparkSession
.builder()
.appName("Multiline")
.master("local[*]")
.config("spark.sql.shuffle.partitions", "4") //Change to a more reasonable default number of partitions for our data
.config("spark.app.id", "Multiline")  // To silence Metrics warning
.getOrCreate()
val sc = spark.sparkContext
def main(args: Array[String]): Unit = {
Logger.getRootLogger.setLevel(Level.ERROR)

try {
import spark.implicits._
val DF = Seq(("02-04-2020 08:02"),("03-04-2020 10:02"),("04-04-2020 09:00"),("04/13/19 9:12"),("04/14/19 2:13"),("04/15/19 10:14"), ("04/16/19 5:15")).toDF("DOB")
import org.apache.spark.sql.functions.{to_date, to_timestamp}
val DOBDF = DF.withColumn("Date", to_date($"DOB", "MM/dd/yyyy"))
DOBDF.show()
// todo: my code below
DF
.rdd
.map(r =>{
if(r.toString.contains("-")) {
val dat = r.toString.substring(1,11).split("-")
val calendar = new GregorianCalendar(dat(2).toInt,dat(1).toInt - 1,dat(0).toInt)
(r.toString, new Date(calendar.getTimeInMillis))
} else {
val dat = r.toString.substring(1,9).split("/")
val calendar = new GregorianCalendar(dat(2).toInt + 2000,dat(0).toInt - 1,dat(1).toInt)
(r.toString, new Date(calendar.getTimeInMillis))
}
})
.toDF("DOB","DATE")
.show()
// To have the opportunity to view the web console of Spark: http://localhost:4040/
println("Type whatever to the console to exit......")
scala.io.StdIn.readLine()
} finally {
sc.stop()
println("SparkContext stopped.")
spark.stop()
println("SparkSession stopped.")
}
}
}
+------------------+----------+
|               DOB|      DATE|
+------------------+----------+
|[02-04-2020 08:02]|2020-04-02|
|[03-04-2020 10:02]|2020-04-03|
|[04-04-2020 09:00]|2020-04-04|
|   [04/13/19 9:12]|2019-04-13|
|   [04/14/19 2:13]|2019-04-14|
|  [04/15/19 10:14]|2019-04-15|
|   [04/16/19 5:15]|2019-04-16|
+------------------+----------+

问候

我们可以使用接受答案中提到的coalesce函数。在每个格式不匹配的情况下,to_date都返回null,这使得合并移动到列表中的下一个格式。

但对于to_date,如果您在以yy格式解析日期中的正确年份成分时遇到问题(在日期7-Apr-50中,如果您希望50解析为1950或2050(,请参阅后的stackoverflow

import org.apache.spark.sql.functions.coalesce
// Reference: https://spark.apache.org/docs/3.0.0/sql-ref-datetime-pattern.html
val parsedDateCol: Column = coalesce(
// Four letters of M looks for full name of the Month
to_date(col("original_date"), "MMMM, yyyy"),
to_date(col("original_date"), "dd-MMM-yy"),
to_date(col("original_date"), "yyyy-MM-dd"),
to_date(col("original_date"), "d-MMM-yy")
)
// I have used some dummy dataframe name.
dataframeWithDateCol.select(
parsedDateCol.as("parsed_date")
)
.show()

最新更新