将 Spark 累加器与结构化流结合使用



在我的结构化流作业中,我正在更新 updateAcrossEvents 方法中的 Spark 累加器,但当我尝试在我的 StreamingListener 中打印它们时,它们始终为 0。代码如下:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)

编辑:更多信息可以更详细地描述问题...

累加器在"updateAcrossEvents"中递增。我有一个 StreamingListener,它在"onQueryProgress"方法中写入累加器的值,但在这种方法中,累加器始终为零!

当我在 updateAcrossEvents 中添加日志语句时,我可以看到这些累加器正在按预期递增。

这仅在我在"群集"模式下运行时发生。在本地模式下,它工作正常,这意味着累加器没有正确分布 - 或类似的东西!

注意:我在网上看到很多答案告诉我执行"操作"。这不是这里的解决方案。这是一个"有状态结构化流式处理"作业。是的,我也在SparkContext中"注册"它们。

在调用操作之前,您是否尝试在转换操作(映射(之外打印它?如果是这样,它将是 0,因为火花正在使用延迟执行。在对数据集调用操作之前,不会调用映射操作中的代码。

df.map{ x => accum.add(x); x }.count
println(accum.value)

这将起作用。

最新更新