我将Spark 2.0.0与Kafka 0.10.2一起使用。
我有一个正在处理来自 Kafka 的消息的应用程序,并且是一个长时间运行的作业。
我不时在日志中看到以下消息。我了解如何增加超时和所有内容,但我想知道的是,我确实有此错误,如何从中恢复?
错误使用者协调器:偏移提交失败。 org.apache.kafka.clients.consumer.CommitFailedException:
无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。
这意味着对 poll() 的后续调用之间的时间比配置的 session.timeout.ms 长,这通常意味着轮询循环花费了太多时间处理消息。
您可以通过增加会话超时或使用 max.poll.records 减少 poll() 中返回的批处理的最大大小来解决此问题。
这不是关于我如何逃避此错误,而是一旦发生如何处理它
背景:在正常情况下,我不会看到提交错误,但是如果我确实得到一个,我应该能够从中恢复。我正在使用AT_LEAST_ONCE
设置,所以我对重新处理一些消息非常满意。 我正在运行Java并使用带有手动提交的DirectKakfaStreams。
创建流:
JavaInputDStream<ConsumerRecord<String, String>> directKafkaStream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
提交偏移量
((CanCommitOffsets) directKafkaStream.inputDStream()).commitAsync(offsetRanges);
我对这种情况的理解是,您使用 Kafka Direct Stream 集成(使用 Spark Streaming + Kafka Integration Guide (Kafka broker 版本 0.10.0 或更高版本)中所述spark-streaming-kafka-0-10_2.11
模块)。
如错误消息中所述:
无法完成提交,因为组已重新平衡并将分区分配给另一个成员。
Kafka 管理使用者使用的主题分区,因此直接流将创建一个使用者池(在单个使用者组内)。
与任何消费者群体一样,您应该期望重新平衡(引用第4章。"Kafka Consumers - Reading Data from Kafka: The Definitive Guide):
使用者组中的使用者共享他们订阅的主题中分区的所有权。当我们向组添加新的使用者时,它开始使用以前由另一个使用者使用的分区中的消息。当使用者关闭或崩溃时,也会发生同样的事情,它离开组,并且它曾经使用的分区将被剩余的使用者之一使用。当使用者组正在使用的主题被修改时,也会将分区重新分配给使用者,例如,如果管理员添加新分区。
在很多情况下,可能会发生重新平衡,并且应该在预期之中。而你做到了。
你问:
我怎样才能从中恢复?这不是关于我如何逃脱此错误,而是一旦发生如何处理它?
我的答案是使用另一种CanCommitOffsets
方法:
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit
这使您可以访问Kafka的OffsetCommitCallback:
OffsetCommitCallback是一个回调接口,用户可以实现该接口以在提交请求完成时触发自定义操作。回调可以在任何调用 poll() 的线程中执行。
我认为onComplete
可以让您掌握异步提交是如何完成的并采取相应的行动。
我无法为您提供太多帮助的是,当无法提交某些偏移量时,如何还原 Spark Streaming 应用程序中的更改。我认为这需要跟踪偏移量并接受某些偏移量无法提交并重新处理的情况。