我正在尝试实现简单的服务,从kafka提取消息,将它们包装在一些数据中并发送到外部服务。
处理消息时处理外部服务不可用的常见模式是什么?
到目前为止,只有当对外部服务的请求成功时,我才手动提交消息。我希望kafka在一段时间后重新发送消息,如果它没有提交,这样处理外部服务故障对消费者是透明的。但我找不到这样做的方法。然而,我很好奇,如果我没有做一些反模式,是否有更好的解决方案。首先你需要考虑,Kafka是基于pull的。因此,如果您想第二次接收消息,您需要将seek()
调整到其偏移量,并将poll()
调整到其偏移量。
此外,如果您想停止处理消息,您可以先对它们进行pause()
分区,然后再对它们进行resume()
分区。参见Consumer JavaDoc: https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
因此,如果您的外部服务关闭,只需暂停并等待,直到它恢复。