如何在kafka消费者服务(Knative上的KafkaSource)上出现任何http错误代码时重新传递消息



我正在使用Knative Eventing(Kafka Source,它从Kafka Cluster读取数据(来触发消费者服务,如果事件/消息没有成功处理(可能是任何原因(,我想自动重新传递消息。我已经试着阅读了文档和API参考资料,但不清楚这是否可能。

问题:在Knative上,一旦KafkaSource向消费者服务发送消息,消费者服务就会处理请求并发送http错误代码响应(基本上消息处理失败(,并在消费者组中显示为滞后(挂起(。

预期:在这种情况下,希望在处理下一个顺序请求之前重新传递相同的消息。是否可以从Kafka Source重新发送mssage或消费者服务重新发送失败的请求?

非常感谢!!

Srinivas p.

更新

由于Knativev0.24事件源内置了重试。

他们会重试任何错误或2xx以外的状态代码。

此外,Kafka Source尊重订单交付。

Ordered delivery是一个按分区阻塞的使用者,它等待CloudEvent订阅者的成功响应,然后再传递分区的下一条消息。

目前Knative Eventing源不支持重试事件传递。为了获得事件交付,您必须将事件发送到Broker或Channel并配置交付属性,如下所述:

  • 交货规格:https://github.com/knative/eventing/blob/master/pkg/apis/duck/v1/delivery_types.go#L29
  • Broker传递配置:https://knative.dev/docs/eventing/broker/
  • 渠道订阅交付:https://knative.dev/docs/eventing/event-delivery/

例如,您可以将事件从KafkaSource发送到KafkaChannel,然后您可以将订阅者注册到该KafkaChannel,在Subscription规范中定义交付选项。

注意:使用这种方法,您将失去消息排序。

相关内容

  • 没有找到相关文章

最新更新