使用 Kafka 使用 Spark 结构化批处理作业管理偏移量



我有一个用例,我正在编写批处理作业

我需要读取Kafka主题并将数据记录到HDFS。我的代码如下所示

val df: DataFrame = spark.read
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load
df.write.
  parquet(buildPathWithCurrentBatchTime())

每次作业读取 Kafka 主题时,它都会从最早的偏移量开始,因此会分批记录相同的消息。如何使我的作业在上一个作业实例读取偏移量之后从偏移量开始读取消息。

我尝试设置检查点位置,组ID,但没有帮助。

我不想使用流式处理查询。我有一个记录来自Kafka Topic的数据的简单用例。我没有任何延迟要求。唯一的要求是日记账中没有任何重复项。这是一个低优先级。如果我使用流式查询,它将一直使用执行器,这是浪费资源。因此我想批量进行

您使用的是批处理查询而不是流式查询。(也许错过了点?简单地将read替换为readStream,将write替换为writeStream即可为您工作。

编辑:正如OP澄清的那样,可以使用一次性触发器,我刚刚更新了代码以使用一次性触发器的结构化流。(免责声明:我没有编译/运行代码,但更改适合结构化流指南文档。

val df: DataFrame = spark.readStream
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load
val query = df.writeStream
  .format("parquet")
  .option("path", buildPathWithCurrentBatchTime())
  .trigger(Trigger.Once())
  .start()
query.awaitTermination()

相关内容

  • 没有找到相关文章

最新更新