Spark SQL -如何查找每小时的事务总数



例如,如果我有一个表,其中transaction numbertransaction date[作为timestamp]列,我如何找出hourly basis上的事务总数?

是否有任何Spark sql函数可用于这种范围计算?

可以使用from_unixtime函数。

val sqlContext = new SQLContext(sc)
import org.apache.spark.sql.functions._
import sqlContext.implicits._
val df = // your dataframe, assuming transaction_date is timestamp in seconds
df.select('transaction_number, hour(from_unixtime('transaction_date)) as 'hour)
      .groupBy('hour)
      .agg(count('transaction_number) as 'transactions)
结果:

+----+------------+
|hour|transactions|
+----+------------+
|  10|        1000|
|  12|        2000|
|  13|        3000|
|  14|        4000|
|  ..|        ....|
+----+------------+

这里我试图给一些指针的方法,相当完整的代码,请参阅这个

时间间隔字面值:使用间隔字面值,可以从日期或时间戳值中执行任意时间量的减法或加法。当您想要在一个固定的时间点上添加或减去一个时间段时,这种表示非常有用。例如,用户现在可以轻松地表达如下查询 "查找过去一小时内发生的所有事务" 。使用以下语法构造间隔文字:[sql]INTERVAL值单位[/sql]


下面是python中的方法。您可以修改下面的示例以符合您的要求,即交易日期开始时间,结束时间。在你的例子中,不是id,而是交易号。
# Import functions.
from pyspark.sql.functions import *
# Create a simple DataFrame.
data = [
  ("2015-01-01 23:59:59", "2015-01-02 00:01:02", 1),
  ("2015-01-02 23:00:00", "2015-01-02 23:59:59", 2),
  ("2015-01-02 22:59:58", "2015-01-02 23:59:59", 3)]
df = sqlContext.createDataFrame(data, ["start_time", "end_time", "id"])
df = df.select(
  df.start_time.cast("timestamp").alias("start_time"),
  df.end_time.cast("timestamp").alias("end_time"),
  df.id)
# Get all records that have a start_time and end_time in the
# same day, and the difference between the end_time and start_time
# is less or equal to 1 hour.
condition = 
  (to_date(df.start_time) == to_date(df.end_time)) & 
  (df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time)
df.filter(condition).show()
+———————+———————+—+
|start_time           |            end_time |id |
+———————+———————+—+
|2015-01-02 23:00:00.0|2015-01-02 23:59:59.0|2  |
+———————+———————+—+

使用此方法,您可以应用组函数来查找您的案例中的事务总数。

上面是python代码,scala呢?

上面使用的expr函数在scala中也可用

还可以查看spark-scala- dateff -two-columns-by-hour- minute它描述如下…

import org.apache.spark.sql.functions._
    val diff_secs_col = col("ts1").cast("long") - col("ts2").cast("long")
    val df2 = df1
      .withColumn( "diff_secs", diff_secs_col )
      .withColumn( "diff_mins", diff_secs_col / 60D )
      .withColumn( "diff_hrs",  diff_secs_col / 3600D )
      .withColumn( "diff_days", diff_secs_col / (24D * 3600D) )

相关内容

  • 没有找到相关文章