如何在不分组的情况下每5分钟获取最后1小时的数据



如何每5分钟触发一次并获取最后1小时的数据?我想到了这个,但它似乎没有给我最后1小时内的所有行。我的理由是:

  1. 读取流,

  2. 根据时间戳列和筛选最近1小时的数据

  3. 使用CCD_ 1进行写入/打印。和

  4. 给它加水印,这样它就不会保留所有过去的数据。

    spark.
    readStream.format("delta").table("xxx")
    .withWatermark("ts", "60 minutes")
    .filter($"ts" > current_timestamp - expr("INTERVAL 60 minutes"))
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("5 minutes"))
    .foreachBatch{ (batchDF: DataFrame, batchId: Long) =>  batchDF.collect().foreach(println)
    }
    .start()
    

或者我必须使用Window吗?但如果我使用Window并且不想分组,我似乎无法摆脱GroupBy

spark.
readStream.format("delta").table("xxx")
.withWatermark("ts", "1 hour")
.groupBy(window($"ts", "1 hour"))
.count()
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("5 minutes"))
.foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
print("...entering foreachBatch...n")
batchDF.collect().foreach(println)
}
.start()

如果您想在代码中调度处理,您应该使用外部调度程序(cron等…(或API java.util.Timer,而不是使用火花流每5分钟执行一次火花代码

为什么不应该使用spark流来安排spark代码执行

如果您使用spark流来调度代码,您将遇到两个问题。

第一个问题是,spark流只处理一次数据。因此,每5分钟,只加载一次新记录。您可以考虑通过使用窗口函数绕过此问题,并通过使用collect_list或用户定义的聚合函数检索行的聚合列表,但随后您将遇到第二个问题。

第二个问题,尽管您的治疗将每5分钟触发一次,但只有在有新记录需要处理时,才会执行foreachBatch内的功能。如果在两次执行之间的5分钟间隔内没有新记录,则不会发生任何事情。

总之,火花流并不是为了安排火花代码在特定的时间间隔执行而设计的。

使用java.util.Timer的解决方案

因此,您不应该使用spark流,而应该使用一个调度器,可以是外部的,如cron、oozie、airflow等……或者在您的代码中

如果您需要在代码中执行此操作,可以使用java.util.Timer,如下所示:

import org.apache.spark.sql.functions.{current_timestamp, expr}
import spark.implicits._
val t = new java.util.Timer()
val task = new java.util.TimerTask {
def run(): Unit = {
spark.read.format("delta").table("xxx")
.filter($"ts" > (current_timestamp() - expr("INTERVAL 60 minutes")))
.collect()
.foreach(println)
}
}
t.schedule(task, 5*60*1000L, 5*60*1000L) // 5 minutes
task.run()

最新更新