如何转换时间戳以在 Spark Scala 中获取小时 HH 格式



我有一个csv数据文件,其中包含一列格式为HH:MM:SS的时间 我正在尝试使用 spark-sql 查询 csv,以便获得最繁忙和最不繁忙的入口/出口时间。 谁能帮我解决这个问题?非常感谢!

这是我的CSV文件示例:

emp_name,emp_badge,door_number,date_time,usage_type
Capucine Letellier,28161comp,5,22:36:27,ENTRANCE
Zoé Bonnin de la Lenoir,75976comp,5,01:08:49,ENTRANCE
Henri Potier,66586comp,4,03:13:16,ENTRANCE
Théodore Rodriguez,39004comp,3,20:55:11,ENTRANCE
Christine Bonneau de Rodrigues,23965comp,4,18:45:42,EXIT

您可以使用返回时间戳小时的hour(string date)函数。示例:hour('2009-07-30 12:58:59') = 12hour('12:58:59') = 12

接下来,您可以像这样计算繁忙时间和不太繁忙的时间:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val rawData = spark.read.csv("data.csv")
// Busy Hours calculation
val windowSpecBusyHours = Window.partitionBy("_c4").orderBy(col("transactions").desc)
val busyHours = rawData
.withColumn("hours", hour(col("_c3")))
.groupBy("_c4", "hours").agg(count("*").alias("transactions"))
.withColumn("dense_rank", dense_rank().over(windowSpecBusyHours))
.select("_c4", "hours", "transactions").where(col("dense_rank") === 1)
busyHours.show(false)
// Less Busy Hours calculation
val windowSpecLessBusyHours = Window.partitionBy("_c4").orderBy(col("transactions").asc)
val lessBusyHours = rawData
.withColumn("hours", hour(col("_c3")))
.groupBy("_c4", "hours").agg(count("*").alias("transactions"))
.withColumn("dense_rank", dense_rank().over(windowSpecLessBusyHours))
.select("_c4", "hours", "transactions").where(col("dense_rank") === 1)
lessBusyHours.show(false)

如果您的 csv 包含

  1. 像"HH:MM:ss"这样的字符串:

    val myCsv = spark.read.csv("path/to/csv")
    //this one splits you string by : and takes the first part of it
    val addHour = myCsv.withColumn("hour", split($"date_time", ":")(0))
    
  2. 时间戳格式:

    val myCsv = spark.read.csv("path/to/csv")
    //  Cast it first to timestamp because csv doesn't keep column format, after that format it to HH
    val addHour = myCsv.withColumn("hour", date_format($"date_time".cast("timestamp"), "HH"))
    

我希望这对你有所帮助。

后期编辑: 要对列使用 $ 运算符,您需要导入 Spark 隐式:

import spark.implicits._

现在您可以使用 $ 代替 col("column_name"( 函数。

最新更新