Spark 结构化流 Kafka 错误 -- 偏移量已更改



我的 Spark 结构化流式处理应用程序运行几个小时,然后失败并出现此错误

java.lang.IllegalStateException: Partition [partition-name] offset was changed from 361037 to 355053, some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".

当然,每次的偏移量都不同,但第一个总是大于第二个。主题数据不能过期,因为主题的保留期为 5 天,我昨天重新创建了这个主题,但今天再次发生错误。从中恢复的唯一方法是删除检查点。

Spark的Kafka集成指南在选项下提到failOnDataLoss

当数据丢失时是否使查询失败(例如, 主题被删除,或偏移量超出范围)。这可能是一个错误的 报警。当它无法按预期工作时,您可以禁用它。批 如果查询无法从 由于数据丢失而提供偏移量。

但是我找不到任何关于何时可以将其视为误报的更多信息,因此我不知道将failOnDataLoss设置为false是否安全,或者我的集群是否存在实际问题(在这种情况下,我们实际上会丢失数据)。

更新:我已经调查了 Kafka 日志,在所有 Spark 失败的情况下,Kafka 都记录了几条这样的消息(我假设每个 Spark 消费者一条):

INFO [GroupCoordinator 1]: Preparing to rebalance group spark-kafka-...-driver-0 with old generation 1 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)

我不再遇到这个问题了。我做了两个更改:

  1. 禁用了 YARN 的动态资源分配(这意味着我必须手动计算执行器等的最佳数量并将它们传递给spark-submit)
  2. 升级到 Spark 2.4.0,也将 Kafka 客户端从 0.10.0.1 升级到 2.0.0

禁用动态资源分配意味着不会在应用程序运行时创建和终止执行程序 (=consumer),从而消除了重新平衡的需要。因此,这很可能是阻止错误发生的原因。

这似乎是旧版本的Spark和spark-sql-kafka库中的一个已知错误。

我发现以下JIRA票证相关:

  • SPARK-28641:微批处理执行提交的偏移量大于可用偏移量
  • SPARK-26267:卡夫卡源可能会重新处理数据
  • KAFKA-7703:调用"seekToEnd"后,KafkaConsumer.position 可能会返回错误的偏移量

简而言之,引用从事这项工作的开发人员:

"这是卡夫卡中的一个已知问题,请参阅KAFKA-7703。此问题在 SPARK-26267 的 2.4.1 和 3.0.0 中修复。请将 Spark 升级到更高版本。另一种可能性是将Kafka升级到2.3.0,其中Kafka端是固定的。

"KAFKA-7703 仅存在于 Kafka 1.1.0 及更高版本中,因此可能的解决方法是使用没有此问题的旧版本。这不会影响Spark 2.3.x及更低版本,因为我们默认使用Kafka 0.10.0.1。

在我们的案例中,我们在HDP 3.1平台上遇到了同样的问题。我们有Spark 2.3.2和spark-sql-kafka库(https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.3.2.3.1.0.0-78),但是使用kafka-clients 2.0.0。这意味着由于后续条件,我们面临此错误:

  • 我们的火花<2.4.1
  • 1.1.0<我们的卡夫卡><2.3.0>

解决方法

我们通过删除包含0偏移量的批号的"偏移量"子文件夹中的检查点文件来解决此问题。

删除这一个文件时,请确保子文件夹"提交"和"偏移量"中的检查点文件中的批号在删除后仍然匹配。

最新更新