我正在使用结构化流来读取KAFKA的数据并创建各种聚合指标。我已经使用metrics.properties
启用了石墨接收器。我已经看到旧Spark版本中的应用程序具有与流相关的指标。我看不到与流媒体流相关的流媒体指标。我究竟做错了什么?
例如 - 无法找到未经处理的批处理或运行批处理或最后完成的批次总延迟。
我已经通过设置启用了流量指标:
SparkSession.builder().config("spark.sql.streaming.metricsEnabled",true)
即使那样,我也只得到3个指标:
- driver.spark.streaming.inputrate
- driver.spark.streaming.latency
- driver.spark.streaming.processingrate
这些指标之间存在差距。此外,在启动应用程序后,它确实开始出现。我如何获得Grafana的广泛相关指标?
我检查了StreamingQueryProgress
。我们只能使用该标准通过编程方式创建自定义指标。有什么办法可以消耗我提到的水槽的激发流的指标?
您仍然可以找到其中一些指标。实际启动流线束的查询有两种方法-Last Progress和最近的Progress
他们暴露了详细信息,例如处理的行数,批次的持续时间,批处理中的输入行数。也有一种称为json
的方法,可以在单个GO中获取所有这些信息,该信息可能可用于发送给某些指标收集器。