出现错误时如何重试 kafka 消息 - 春季云流



我对卡夫卡很陌生。我正在使用春云流卡夫卡来生产和消费

@StreamListener(Sink.INPUT)
public void process(Order order) {
try {
// have my message processing 
}
catch( exception e ) {
//retry here that record..
}
}
}

只是想知道如何实现重试?对此的任何帮助都非常感谢

Hy

有多种方法可以处理"重试",这取决于您遇到的事件类型。

对于基本问题,kafka 框架将重试,以便您从错误条件中恢复,例如,在网络停机时间较短的情况下,消费者和生产者 API 会实现自动重试。

特别是 kafka 支持"内置生产者/消费者重试",以正确处理各种错误而不会丢失消息,但作为开发人员,您仍然必须能够使用您提到的 try-catch 块处理其他类型的错误。

卡夫卡中的错误可以分为以下几类:

  • (生产者和消费者方面(不可重试的代理错误,例如有关消息大小的错误、授权错误等 ->您必须在应用程序的"设计阶段"处理它们。
  • (
  • 生产者端(在将消息发送到代理之前发生的错误(例如,序列化错误(,>必须在运行时应用程序执行中处理这些错误
  • (生产者和消费者端当生产者用尽所有重试尝试或当 由于在重试时使用所有内存来存储消息,生产者使用的可用内存已填满限制 ->您应该处理这些错误。

关于"如何重试"的另一个注意事项是如何正确处理自动提交选项设置为 false 时的提交顺序。

正确获取提交顺序的一种常见且简单的模式是使用单调递增的序列号。每次提交时增加序列号,并在提交时将序列号添加到提交函数。

当您准备发送重试时,请检查 回调得到的提交序列号等于实例 变量;如果是,则没有较新的提交,可以安全地重试。如果 实例序列号较高,请勿重试,因为 已发送较新的提交。

最新更新