PySpark-使用本机Spark函数将长Epoch(以毫秒为单位)转换为TimestampType



我使用PySpark库读取JSON文件、处理数据并写回镶木地板文件。

传入数据有一个从epoch开始测量的日期字段(以毫秒为单位(。例如,1541106106796表示:Thursday, November 1, 2018 9:01:46.796 PM

一个有效的解决方案使用Pythondatetime库:

def format_datetime(ts):
return datetime.fromtimestamp(ts/1000.0)
...
get_timestamp = udf(lambda x: format_datetime(int(x)),TimestampType())
df = df.withColumn("timestamp", get_timestamp(df.ts))

有没有只使用本机Spark函数的解决方案?

使用from_unixtime并从时间戳中提取毫秒,然后在末尾添加,最后强制转换为timestamp类型。

df.show()
#+-------------+
#|           ts|
#+-------------+
#|1541106106796|
#+-------------+
df.withColumn("ts1",expr('concat_ws(".",from_unixtime(substring(ts,1,length(ts)-3),"yyyy-MM-dd HH:mm:ss"),substring(ts,length(ts)-2,length(ts)))').cast("timestamp")).
show(10,False)
#+-------------+-----------------------+
#|ts           |ts1                    |
#+-------------+-----------------------+
#|1541106106796|2018-11-01 16:01:46.796|
#+-------------+-----------------------+

要创建unixtime,请使用unix_timestampregexp_extract函数。

Example:

df.show(10,False)
#+-----------------------------------------+
#|sample                                   |
#+-----------------------------------------+
#|Thursday, November 1, 2018 9:01:46.796 PM|
#+-----------------------------------------+
df.withColumn("ts",concat_ws('',unix_timestamp(col("sample"),"E, MMMM d, yyyy hh:mm:ss.SSS a"),regexp_extract(col("sample"),"\.(.*)\s+",1))).
show(10,False)
#+-----------------------------------------+-------------+
#|sample                                   |ts           |
#+-----------------------------------------+-------------+
#|Thursday, November 1, 2018 9:01:46.796 PM|1541124106796|
#+-----------------------------------------+-------------+

相关内容

  • 没有找到相关文章

最新更新