Spark 存储非流式处理(批量读写)方法中的 kafka 偏移检查点



我有一个用例,我想处理来自 kafka 的特定偏移集并存储在 cassandra 中,并维护检查点,以便在发生故障时,我可以从检查点重新启动应用程序。由于它不是流媒体应用程序 -

val startingOffsets = """{"topic_name": { "0": 33190, "1": 557900, "2": -2} }"""
val endingOffsets =  """{"topic_name": { "0": 33495, "1": 559905, "2": -1} }"""
val df = sparkSession
.read
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", "kafka.brokers".getConfigValue) 
.option("subscribe", "kafka.devicelocationdatatopic".getConfigValue) 
.option("startingOffsets", "kafka.startingOffsets".getConfigValue)
.option("endingOffsets", "kafka.endingOffsets".getConfigValue)
.option("failOnDataLoss", "false") // any failure regarding data loss in topic or else, not supposed to fail, it has to continue...
.option("maxOffsetsPerTrigger", "3") // any change please remove the checkpoint folder
.load()

而写法是——

df
.write
.cassandraFormat(
"tbl_name",
"cassandra.keyspace".getConfigValue,
"cassandra.clustername".getConfigValue )
.mode(SaveMode.Append)
.option("checkpointLocation", checkpointDirectory)
.save()

我试过这个选项不起作用-

.option("checkpointLocation", checkpointDirectory)

在检查时,我发现了一些博客,这些博客可以创建检查点目录 -

sc.setCheckpointDir("/batchProcessKafka")

和写作保持设置为-

df.checkpoint(true)

但这节省了整个RDD,而我只想跟踪我的kafka偏移量。有什么建议吗?

不确定现在是否有帮助。我一直在为 Kafka 寻找类似的方法,并认为 spark 可以选择只触发一次流,这种流像批处理一样运行,让 Spark 为您管理检查点/偏移量。 下面是一个 PySpark 示例 -

df.writeStream.trigger(once=True).format("parquet").option("checkpointLocation", <your checkpoint location>).foreachBatch(<your func here>)

欲了解更多信息 - https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

最新更新