如何每5分钟触发一次并获取最后1小时的数据?我想到了这个,但它似乎没有给我最后1小时内的所有行。我的理由是:
-
读取流,
-
根据时间戳列和筛选最近1小时的数据
-
使用CCD_ 1进行写入/打印。和
-
给它加水印,这样它就不会保留所有过去的数据。
spark. readStream.format("delta").table("xxx") .withWatermark("ts", "60 minutes") .filter($"ts" > current_timestamp - expr("INTERVAL 60 minutes")) .writeStream .format("console") .trigger(Trigger.ProcessingTime("5 minutes")) .foreachBatch{ (batchDF: DataFrame, batchId: Long) => batchDF.collect().foreach(println) } .start()
或者我必须使用Window吗?但如果我使用Window并且不想分组,我似乎无法摆脱GroupBy
。
spark.
readStream.format("delta").table("xxx")
.withWatermark("ts", "1 hour")
.groupBy(window($"ts", "1 hour"))
.count()
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("5 minutes"))
.foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
print("...entering foreachBatch...n")
batchDF.collect().foreach(println)
}
.start()
如果您想在代码中调度处理,您应该使用外部调度程序(cron等…(或API java.util.Timer,而不是使用火花流每5分钟执行一次火花代码
为什么不应该使用spark流来安排spark代码执行
如果您使用spark流来调度代码,您将遇到两个问题。
第一个问题是,spark流只处理一次数据。因此,每5分钟,只加载一次新记录。您可以考虑通过使用窗口函数绕过此问题,并通过使用collect_list或用户定义的聚合函数检索行的聚合列表,但随后您将遇到第二个问题。
第二个问题,尽管您的治疗将每5分钟触发一次,但只有在有新记录需要处理时,才会执行foreachBatch
内的功能。如果在两次执行之间的5分钟间隔内没有新记录,则不会发生任何事情。
总之,火花流并不是为了安排火花代码在特定的时间间隔执行而设计的。
使用java.util.Timer的解决方案
因此,您不应该使用spark流,而应该使用一个调度器,可以是外部的,如cron、oozie、airflow等……或者在您的代码中
如果您需要在代码中执行此操作,可以使用java.util.Timer,如下所示:
import org.apache.spark.sql.functions.{current_timestamp, expr}
import spark.implicits._
val t = new java.util.Timer()
val task = new java.util.TimerTask {
def run(): Unit = {
spark.read.format("delta").table("xxx")
.filter($"ts" > (current_timestamp() - expr("INTERVAL 60 minutes")))
.collect()
.foreach(println)
}
}
t.schedule(task, 5*60*1000L, 5*60*1000L) // 5 minutes
task.run()