如何使用选项("dateFormat")在SPARK中读取数据帧时处理csv文件中的不同日期格式?



我有以下输入文件

INPUTFILE_CNTRY_CODE|INPUTFILE_CTY_CODE|INPUTFILE_ID|INPUTFILE_LTY_ID|INPUTFILE_CNSM_ID|INPUTFILE_DATE|INPUTFILE_TIME|INPUTFILE_TRDATE
GBR|263|326735246||I034867789V|15/11/30|2015-11-30 00:00:00.000000|2016-22-06
GBR|263|397802068|PC7135361|PC7135361|16/05/20|2016-10-06 11:50:05.000000|2016-22-07

我正在尝试阅读以下内容。

val registeration_schema = StructType(List(
StructField("INPUTFILE_CNTRY_CODE", StringType),
StructField("INPUTFILE_CTY_CODE", IntegerType),
StructField("INPUTFILE_ID", IntegerType),
StructField("INPUTFILE_LTY_ID", StringType),
StructField("INPUTFILE_CNSM_ID", StringType),
StructField("INPUTFILE_DATE", DateType),
StructField("INPUTFILE_TIME", TimestampType),
StructField("INPUTFILE_TRDATE", DateType)
))
val registerationDF = spark.read
.option("header", "true")
.option("delimiter", "|")
.option("mode", "FAILFAST")
.schema(registeration_schema)
.option("dateFormat", "yy/M/d")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.csv("registration2.csv")

我正在以下错误.

Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.time.format.DateTimeParseException: Text '2016-22-06' could not be parsed at index 2
at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:262)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$doParse$2(UnivocityParser.scala:200)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.parse(UnivocityParser.scala:207)
at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:347)
at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
... 27 more
Caused by: java.time.format.DateTimeParseException: Text '2016-22-06' could not be parsed at index 2

发生这种情况是因为日期格式不同,我在加载数据帧时只指定了一种日期格式。有人可以指导如何在将 CSV 读取到数据帧时处理多个日期格式。

加载 CSV 时,不能为DateType定义多种格式。但是你可以通过玩date_format(( 和to_date(( spark 2.2+ 可用的函数来实现这一点。

高级别的步骤如下 -

  1. 将这两列之一定义为原始架构中的string。我选择在演示中将INPUTFILE_DATE定义为string
  2. 使用date_format()to_date()函数以适当的格式设置INPUTFILE_DATE的格式,以转换其数据类型。

使用 INPUTFILE_DATE --> StringType 定义原始架构

val registeration_schema = StructType(List(
StructField("INPUTFILE_CNTRY_CODE", StringType),
StructField("INPUTFILE_CTY_CODE", IntegerType),
StructField("INPUTFILE_ID", IntegerType),
StructField("INPUTFILE_LTY_ID", StringType),
StructField("INPUTFILE_CNSM_ID", StringType),
StructField("INPUTFILE_DATE", StringType),
StructField("INPUTFILE_TIME", TimestampType),
StructField("INPUTFILE_TRDATE", DateType)
))
val registerationDF = spark.read
.option("header", "true")
.option("delimiter", "|")
.option("mode", "FAILFAST")
.schema(registeration_schema)
.option("dateFormat", "yyyy-dd-MM")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS")
.csv("registration2.csv")

此解决方案的核心部分是 -

val targetDF  = registerationDF.withColumn("INPUTFILE_DATE",to_date((date_format(to_date(col("INPUTFILE_DATE"),"yy/MM/dd"),"yyyy-dd-MM")),"yyyy-dd-MM"))

最终结果 -

scala> targetDF.printSchema()
root
|-- INPUTFILE_CNTRY_CODE: string (nullable = true)
|-- INPUTFILE_CTY_CODE: integer (nullable = true)
|-- INPUTFILE_ID: integer (nullable = true)
|-- INPUTFILE_LTY_ID: string (nullable = true)
|-- INPUTFILE_CNSM_ID: string (nullable = true)
|-- INPUTFILE_DATE: date (nullable = true)
|-- INPUTFILE_TIME: timestamp (nullable = true)
|-- INPUTFILE_TRDATE: date (nullable = true)

scala> targetDF.show()
+--------------------+------------------+------------+----------------+-----------------+--------------+-------------------+----------------+
|INPUTFILE_CNTRY_CODE|INPUTFILE_CTY_CODE|INPUTFILE_ID|INPUTFILE_LTY_ID|INPUTFILE_CNSM_ID|INPUTFILE_DATE|     INPUTFILE_TIME|INPUTFILE_TRDATE|
+--------------------+------------------+------------+----------------+-----------------+--------------+-------------------+----------------+
|                 GBR|               263|   326735246|            null|      I034867789V|    2015-11-30|2015-11-30 00:00:00|      2017-10-06|
|                 GBR|               263|   397802068|       PC7135361|        PC7135361|    2016-05-20|2016-10-06 11:50:05|      2017-10-07|
+--------------------+------------------+------------+----------------+-----------------+--------------+-------------------+----------------+

最新更新