如何计算一天从 Kafka 主题获取的消息数量?



我正在从Kafka主题中获取数据并以Deltalake(parquet)格式存储它们。我希望找到特定日期获取的消息数量

我的思考过程:我想使用 Spark 读取以镶木地板格式存储数据的目录,并在特定日期使用".parquet"对文件应用计数。这将返回一个计数,但我不确定这是否是正确的方法。

这种方式正确吗?有没有其他方法可以计算特定日期(或持续时间)从 Kafka 主题获取的消息数量?

我们从主题使用的消息不仅具有键值,而且还具有时间戳等其他信息

可用于跟踪消费者流量。

时间戳时间戳由代理或生产者根据主题配置进行更新。如果主题配置的时间戳类型为 CREATE_TIME,则代理将使用创建者记录中的时间戳,而如果主题配置为LOG_APPEND_TIME,则在追加记录时,代理将使用代理本地时间覆盖时间戳。

  1. 因此,如果您保留时间戳,则可以很好地跟踪每天或每小时的消息速率。

  2. 其他方式,您可以使用一些 Kafka 仪表板,如 Confluent Control Center(许可证价格)或 Grafana(免费)或任何其他工具来跟踪消息流。

  3. 在我们的例子中,在使用消息并存储或处理消息时,我们还将消息的元详细信息路由到 Elastic 搜索,我们可以通过 Kibana 将其可视化。

您可以利用三角洲湖提供的"时间旅行"功能。

在您的情况下,您可以做

// define location of delta table
val deltaPath = "file:///tmp/delta/table"
// travel back in time to the start and end of the day using the option 'timestampAsOf'
val countStart = spark.read.format("delta").option("timestampAsOf", "2021-04-19 00:00:00").load(deltaPath).count()
val countEnd = spark.read.format("delta").option("timestampAsOf", "2021-04-19 23:59:59").load(deltaPath).count()
// print out the number of messages stored in Delta Table within one day
println(countEnd - countStart)

请参阅有关查询表的较旧快照(时间旅行)的文档。

检索此信息而不计算两个版本之间的行的另一种方法是使用增量表历史记录。 这样做有几个优点 - 你不读取整个数据集,你也可以考虑更新和删除,例如,如果你正在做MERGE操作(不可能比较不同版本上的.count,因为更新正在替换实际值,或删除行)。

例如,对于仅追加,以下代码将计算正常append操作写入的所有插入行(对于其他内容,例如合并/更新/删除,我们可能需要查看其他指标):

from delta.tables import *
df = DeltaTable.forName(spark, "ml_versioning.airbnb").history()
.filter("timestamp > 'begin_of_day' and timestamp < 'end_of_day'")
.selectExpr("cast(nvl(element_at(operationMetrics, 'numOutputRows'), '0') as long) as rows")
.groupBy().sum()

最新更新