Kafka -处理消费者无法处理消息的模式



我正在尝试实现简单的服务,从kafka提取消息,将它们包装在一些数据中并发送到外部服务。

处理消息时处理外部服务不可用的常见模式是什么?

到目前为止,只有当对外部服务的请求成功时,我才手动提交消息。我希望kafka在一段时间后重新发送消息,如果它没有提交,这样处理外部服务故障对消费者是透明的。但我找不到这样做的方法。然而,我很好奇,如果我没有做一些反模式,是否有更好的解决方案。

首先你需要考虑,Kafka是基于pull的。因此,如果您想第二次接收消息,您需要将seek()调整到其偏移量,并将poll()调整到其偏移量。

此外,如果您想停止处理消息,您可以先对它们进行pause()分区,然后再对它们进行resume()分区。参见Consumer JavaDoc: https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

中的"消费流控制"一节

因此,如果您的外部服务关闭,只需暂停并等待,直到它恢复。

相关内容

  • 没有找到相关文章

最新更新