结构化流 :水印与恰好一次语义



编程指南说,结构化流保证端到端恰好一次使用适当的源/接收器的语义。

但是,我不明白当作业崩溃并且我们应用了水印时,这是如何工作的。

以下是我目前想象它如何工作的示例,请纠正我误解的任何观点。提前感谢!

例:

Spark 作业:在每个 1 小时窗口中计数 # 个事件,并带有 1 小时水印。

消息:

  • A - 时间戳上午 10 点
  • B - 时间戳上午 10:10
  • C - 时间戳上午 10:20
  • X - 时间戳 12pm
  • Y - 时间戳 12:50pm
  • Z - 时间戳晚上 8 点

我们开始作业,从源中读取 A、B、C,作业在上午 10:30 崩溃,然后我们把它们写到我们的接收器。

下午 6 点,作业恢复并知道使用保存的检查点/WAL 重新处理 A、B、C。上午 10-11 点窗口的最终计数为 3。

接下来,它并行读取来自 Kafka、X、Y、Z 的新消息,因为它们属于不同的分区。首先处理 Z,因此最大事件时间戳设置为 8pm。当作业读取 X 和 Y 时,它们现在位于水印后面(8pm - 1 小时 = 7pm(,因此它们将作为旧数据丢弃。晚上 8-9 点的最终计数为 1,作业不会在中午 12-1 点的窗口中报告任何内容。我们丢失了 X 和 Y 的数据。

---结束示例---

这种情况准确吗? 如果是这样,当从 Kafka-Sspark 正常流动时,1 小时的水印可能足以处理延迟/无序数据,但在 Spark 作业关闭/Kafka 连接长时间丢失时则不然。避免数据丢失的唯一选择是使用比您预期的作业更长的水印吗?

水印是小批量期间的固定值。在您的示例中,由于 X、Y 和 Z 是在同一小批量中处理的,因此用于此记录的水印将为上午 9:20。完成后,小批量水印将更新到晚上 7 点。

下面是实现水印功能的功能 SPARK-18124 的设计文档中的引用:

要在基于触发器的执行中计算丢弃边界,我们必须执行以下操作。

在每个触发器中,
  • 在聚合数据的同时,我们还扫描触发器数据中事件时间的最大值
  • 触发完成后,计算水印=MAX(触发前的事件时间,触发前的最大事件时间(-阈值

可能模拟会有更多的描述:

import org.apache.hadoop.fs.Path
import java.sql.Timestamp
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime
val dir = new Path("/tmp/test-structured-streaming")
val fs = dir.getFileSystem(sc.hadoopConfiguration)
fs.mkdirs(dir)
val schema = StructType(StructField("vilue", StringType) ::
StructField("timestamp", TimestampType) ::
Nil)
val eventStream = spark
.readStream
.option("sep", ";")
.option("header", "false")
.schema(schema)
.csv(dir.toString)
// Watermarked aggregation
val eventsCount = eventStream
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.count
def writeFile(path: Path, data: String) {
val file = fs.create(path)
file.writeUTF(data)
file.close()
}
// Debug query
val query = eventsCount.writeStream
.format("console")
.outputMode("complete")
.option("truncate", "false")
.trigger(ProcessingTime("5 seconds"))
.start()
writeFile(new Path(dir, "file1"), """
|A;2017-08-09 10:00:00
|B;2017-08-09 10:10:00
|C;2017-08-09 10:20:00""".stripMargin)
query.processAllAvailable()
val lp1 = query.lastProgress
// -------------------------------------------
// Batch: 0
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// +---------------------------------------------+-----+
// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T10:10:00.000Z",
//     "max" : "2017-08-09T10:20:00.000Z",
//     "min" : "2017-08-09T10:00:00.000Z",
//     "watermark" : "1970-01-01T00:00:00.000Z"
//   },
//   ...
// }

writeFile(new Path(dir, "file2"), """
|Z;2017-08-09 20:00:00
|X;2017-08-09 12:00:00
|Y;2017-08-09 12:50:00""".stripMargin)
query.processAllAvailable()
val lp2 = query.lastProgress
// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+

// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T14:56:40.000Z",
//     "max" : "2017-08-09T20:00:00.000Z",
//     "min" : "2017-08-09T12:00:00.000Z",
//     "watermark" : "2017-08-09T09:20:00.000Z"
//   },
//   "stateOperators" : [ {
//     "numRowsTotal" : 3,
//     "numRowsUpdated" : 2
//   } ],
//   ...
// }
writeFile(new Path(dir, "file3"), "")
query.processAllAvailable()
val lp3 = query.lastProgress
// -------------------------------------------
// Batch: 2
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+

// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 0,
//   "eventTime" : {
//     "watermark" : "2017-08-09T19:00:00.000Z"
//   },
//   "stateOperators" : [ ],
//   ...
// }
query.stop()
fs.delete(dir, true)

请注意批次 0 如何从水印1970-01-01 00:00:00开始,而批次 1 从水印2017-08-09 09:20:00开始(批次 0 的最大事件时间减去 1 小时(。批次 2,虽然是空的,但使用了水印2017-08-09 19:00:00

首先

处理 Z,因此最大事件时间戳设置为 8pm。

没错。即使可以首先计算Z,也会从当前查询迭代中的最大时间戳中减去水印。这意味着 08:00 PM 将被设置为我们减去水印时间的时间,这意味着 12:00 和 12:50 将被丢弃。

从文档中:

对于从时间 T 开始的特定窗口,引擎将保持状态并允许延迟数据更新状态,直到(引擎看到的最大事件时间 - 延迟阈值> T(


避免数据丢失的唯一选择是使用比您预期的作业更长的水印

不一定。假设您将每个 Kafka 查询要读取的最大数据量设置为 100 个项目。如果您读取小批量,并且从每个分区串行读取,则每个批次的每个最大时间戳可能不是代理中最新消息的最长时间,这意味着您不会丢失这些消息。

最新更新