从最新偏移恢复结构化流



我想创建 Spark 结构化流作业,从 Kafka 源读取消息,写入 Kafka 接收器,失败后将恢复仅读取当前的最新消息。出于这个原因,我不需要为我的工作保留检查点。

但看起来在结构化流中写入 Kafka 接收器时没有禁用检查点的选项。据我了解,即使我在来源上指定:

.option("startingOffsets", "latest")

只有在首次运行流时才会考虑它,失败后流将从检查点恢复。有什么解决方法吗?有没有办法禁用检查点?

解决方法是从代码中删除现有的检查点位置,以便每次它都会开始获取最新的偏移量数据。

import org.apache.hadoop.fs.{FileSystem, Path}
val checkPointLocation="/path/in/hdfs/location"
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
fs.delete(new Path(checkPointLocation),true) 
// Delete check point location if exist.
val options = Map(
"kafka.bootstrap.servers"-> "localhost:9092",
"topic" -> "topic_name",
"checkpointLocation" -> checkPointLocation,
"startingOffsets" -> "latest"
)
df
.writeStream
.format("kafka")
.outputMode("append")
.options(options)
.start()
.awaitTermination()

最新更新