我有一个csv数据文件,其中包含一列格式为HH:MM:SS的时间 我正在尝试使用 spark-sql 查询 csv,以便获得最繁忙和最不繁忙的入口/出口时间。 谁能帮我解决这个问题?非常感谢!
这是我的CSV文件示例:
emp_name,emp_badge,door_number,date_time,usage_type
Capucine Letellier,28161comp,5,22:36:27,ENTRANCE
Zoé Bonnin de la Lenoir,75976comp,5,01:08:49,ENTRANCE
Henri Potier,66586comp,4,03:13:16,ENTRANCE
Théodore Rodriguez,39004comp,3,20:55:11,ENTRANCE
Christine Bonneau de Rodrigues,23965comp,4,18:45:42,EXIT
您可以使用返回时间戳小时的hour(string date)
函数。示例:hour('2009-07-30 12:58:59') = 12
、hour('12:58:59') = 12
。
接下来,您可以像这样计算繁忙时间和不太繁忙的时间:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val rawData = spark.read.csv("data.csv")
// Busy Hours calculation
val windowSpecBusyHours = Window.partitionBy("_c4").orderBy(col("transactions").desc)
val busyHours = rawData
.withColumn("hours", hour(col("_c3")))
.groupBy("_c4", "hours").agg(count("*").alias("transactions"))
.withColumn("dense_rank", dense_rank().over(windowSpecBusyHours))
.select("_c4", "hours", "transactions").where(col("dense_rank") === 1)
busyHours.show(false)
// Less Busy Hours calculation
val windowSpecLessBusyHours = Window.partitionBy("_c4").orderBy(col("transactions").asc)
val lessBusyHours = rawData
.withColumn("hours", hour(col("_c3")))
.groupBy("_c4", "hours").agg(count("*").alias("transactions"))
.withColumn("dense_rank", dense_rank().over(windowSpecLessBusyHours))
.select("_c4", "hours", "transactions").where(col("dense_rank") === 1)
lessBusyHours.show(false)
如果您的 csv 包含
-
像"HH:MM:ss"这样的字符串:
val myCsv = spark.read.csv("path/to/csv") //this one splits you string by : and takes the first part of it val addHour = myCsv.withColumn("hour", split($"date_time", ":")(0))
-
时间戳格式:
val myCsv = spark.read.csv("path/to/csv") // Cast it first to timestamp because csv doesn't keep column format, after that format it to HH val addHour = myCsv.withColumn("hour", date_format($"date_time".cast("timestamp"), "HH"))
我希望这对你有所帮助。
后期编辑: 要对列使用 $ 运算符,您需要导入 Spark 隐式:
import spark.implicits._
现在您可以使用 $ 代替 col("column_name"( 函数。