Spark 2.0 时间戳差异(以毫秒为单位)使用 Scala



我正在使用Spark 2.0,并正在寻找一种在Scala中实现以下目标的方法:

需要两个数据帧列值之间的时间戳差异(以毫秒为单位)。

Value_1 = 06/13/2017 16:44:20.044
Value_2 = 06/13/2017 16:44:21.067

两者的数据类型都是时间戳。

注意:对两个值应用函数unix_timestamp(列s)并减去有效,但不能达到毫秒值,这是必需的。

最终查询如下所示:

Select **timestamp_diff**(Value_2,Value_1) from table1

这应返回以下输出:

1023

毫秒其中timestamp_diff是以毫秒为单位计算差异的函数。

一种方法是使用 Unix 纪元时间,即自 1970 年 1 月 1 日以来的毫秒数。下面是一个使用UDF的示例,它需要两个时间戳并返回它们之间的差异(以毫秒为单位)。

val timestamp_diff = udf((startTime: Timestamp, endTime: Timestamp) => {
(startTime.getTime() - endTime.getTime())
})
val df = // dataframe with two timestamp columns (col1 and col2)
.withColumn("diff", timestamp_diff(col("col2"), col("col1")))

或者,您可以注册函数以与 SQL 命令一起使用:

val timestamp_diff = (startTime: Timestamp, endTime: Timestamp) => {
(startTime.getTime() - endTime.getTime())
}
spark.sqlContext.udf.register("timestamp_diff", timestamp_diff)
df.createOrReplaceTempView("table1")
val df2 = spark.sqlContext.sql("SELECT *, timestamp_diff(col2, col1) as diff from table1")

PySpark 也是如此:

import datetime
def timestamp_diff(time1: datetime.datetime, time2: datetime.datetime):
return int((time1-time2).total_seconds()*1000)

int*1000仅输出毫秒

用法示例:

spark.udf.register("timestamp_diff", timestamp_diff)    
df.registerTempTable("table1")
df2 = spark.sql("SELECT *, timestamp_diff(col2, col1) as diff from table1")

这不是最佳解决方案,因为 UDF 通常很慢,因此您可能会遇到性能问题。

派对有点晚了,但希望它仍然有用。

import org.apache.spark.sql.Column
def getUnixTimestamp(col: Column): Column = (col.cast("double") * 1000).cast("long")
df.withColumn("diff", getUnixTimestamp(col("col2")) - getUnixTimestamp(col("col1")))

当然,您可以为差异定义一个单独的方法:

def timestampDiff(col1: Column, col2: Column): Column = getUnixTimestamp(col2) - getUnixTimestamp(col1)
df.withColumn("diff", timestampDiff(col("col1"), col("col2")))

为了使生活更轻松,可以使用默认的diff名称为Strings 定义一个重载方法:

def timestampDiff(col1: String, col2: String): Column = timestampDiff(col(col1), col(col2)).as("diff")

现在在行动:

scala> df.show(false)
+-----------------------+-----------------------+
|min_time               |max_time               |
+-----------------------+-----------------------+
|1970-01-01 01:00:02.345|1970-01-01 01:00:04.786|
|1970-01-01 01:00:23.857|1970-01-01 01:00:23.999|
|1970-01-01 01:00:02.325|1970-01-01 01:01:07.688|
|1970-01-01 01:00:34.235|1970-01-01 01:00:34.444|
|1970-01-01 01:00:34.235|1970-01-01 01:00:34.454|
+-----------------------+-----------------------+

scala> df.withColumn("diff", timestampDiff("min_time", "max_time")).show(false)
+-----------------------+-----------------------+-----+
|min_time               |max_time               |diff |
+-----------------------+-----------------------+-----+
|1970-01-01 01:00:02.345|1970-01-01 01:00:04.786|2441 |
|1970-01-01 01:00:23.857|1970-01-01 01:00:23.999|142  |
|1970-01-01 01:00:02.325|1970-01-01 01:01:07.688|65363|
|1970-01-01 01:00:34.235|1970-01-01 01:00:34.444|209  |
|1970-01-01 01:00:34.235|1970-01-01 01:00:34.454|219  |
+-----------------------+-----------------------+-----+

scala> df.select(timestampDiff("min_time", "max_time")).show(false)
+-----+
|diff |
+-----+
|2441 |
|142  |
|65363|
|209  |
|219  |
+-----+

相关内容

  • 没有找到相关文章

最新更新