火花流保证特定的开始窗口时间



我正在使用Spark Streaming使用结构化流框架读取Kinesis的数据,我的连接如下

val kinesis = spark
  .readStream
  .format("kinesis")
  .option("streams", streamName)
  .option("endpointUrl", endpointUrl)
  .option("initialPositionInStream", "earliest")
  .option("format", "json")
  .schema(<my-schema>)
  .load

数据来自几个具有唯一ID的物联网设备,我需要通过此ID和时间戳字段上的一个翻滚窗口汇总数据,如下所示:

val aggregateData = kinesis
    .groupBy($"uid", window($"timestamp", "15 minute", "15 minute"))
    .agg(...)

我遇到的问题是,我需要保证每个窗口都在圆圈开始(例如00:00:00,00:15:00等),我也需要保证只有一个包含的行完整的15分钟长窗口将输出到我的水槽中,我目前正在做的是

val query = aggregateData
    .writeStream
      .foreach(postgreSQLWriter)
      .outputMode("update")
      .start()
      .awaitTermination()

ths ths postgresqlwriter是我创建的流媒体,用于将每一行插入PostgreSQL SGBD中。我如何强迫我的窗户要精确15分钟,而开始的开始时间是每个设备唯一ID的15分钟时间戳值?

问题1:要从特定的时间开始,还有一个参数Spark分组功能采用,即"偏移"。通过指定它将在指定时间从一个小时开始的时间开始示例:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute"))

因此,上面的语法将按列1分组,并创建22分钟持续时间的窗口,滑动窗口大小为1分钟,偏移为15分钟

例如,它从:

开始
window1: 8:15(8:00 add 15 minute offset) to 8:37 (8:15 add 22 minutes)
window2: 8:16(previous window start + 1 minute) to 8:38 ( 22 minute size again)

问题2:要仅推动那些具有完整15分钟大小的窗口,请创建一个计数列,该列计算该窗口中具有的事件数量。一旦达到15

计算计数:

dataframe.groupBy($"Column1",window($"TimeStamp","22 minute","1 minute","15 minute")).agg(count*$"Column2").as("count"))

仅包含计数15的Writestream过滤器:

aggregateddata.filter($"count"===15).writeStream.format(....).outputMode("complete").start()

最新更新