如何从Kafka主题获取记录总数并保存到HDFS中?



All,

我正在努力将来自 Kafka 的数据转储到 HDFS 中。我能够使用数据,并希望从 Kafka 获取记录总数并作为文件保存到 HDFS 中,以便我可以使用该文件进行验证。我能够在控制台中打印记录,但我不确定如何创建总数文件?

查询从 Kafka 拉取记录:

Dataset ds1=ds.filter(args[5]);
StreamingQuery query = ds1
.coalesce(10)
.writeStream()
.format("parquet")
.option("path", path.toString())
.option("checkpointLocation", args[6] + "/checkpoints" + args[2])
.trigger(Trigger.Once())
.start();
try {
query.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
System.exit(1);
}   

以及我为获取记录并在控制台中打印而编写的代码:

Dataset stream=ds1.groupBy("<column_name>").count();//实际上,我想在不使用 GroupBy 的情况下获取计数,我已经尝试long stream=ds1.count()但遇到了错误。

StreamingQuery query1=stream.coalesce(1)
.writeStream()
.format("csv")
.option("path", path + "/record")
.start();
try {
query1.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
System.exit(1);
} 

这是行不通的,你能帮我解决这个问题吗?

主题中任何时候的记录数都是一个移动的目标。

您需要使用旧的 Spark 流式处理来查找每个 Spark partiton 批处理的记录数,然后使用Accumulator来计算处理的所有记录,但这将是您可以获得的最接近的记录。

Spark + Kafka声称只有一次处理语义,所以我建议你专注于错误捕获和监控,而不仅仅是计数验证。

最新更新