pubsub中subscriber的重试设置是什么?如何在spring应用程序中正确设置这些设置



我有一个spring服务子分区,用于接收来自谷歌云pubsub(pulling)中某个主题的消息。它总体上工作正常。但我想对收到的信息有更多的控制权。我的服务有时需要确认消息,或者让确认截止日期过去,这样我以后就会再次收到消息。在使用单个消息进行测试时,nacked消息几乎立即返回给我,而那些我根本不确认或nack的消息,会在默认为ackDeadline的10秒后返回。我希望它推迟重复使用这些信息。我认为重试设置是为这种情况设计的。我还应该提到,我目前正在使用模拟器进行本地测试,并从代码中创建订阅。我正在使用PubSubAdmin进行管理。

根据这个文档,我试图在我的配置文件中设置这些配置。像这样:

spring.cloud.gcp.pubsub.subscriber.retry.initial-retry-delay-second: 4
spring.cloud.gcp.pubsub.subscriber.retry.max-attempts: 5
spring.cloud.gcp.pubsub.subscriber.retry.initial-rpc-timeout-seconds: 4
spring.cloud.gcp.pubsub.subscriber.retry.max-rpc-timeout-seconds: 8
spring.cloud.gcp.pubsub.subscriber.retry.max-retry-delay-seconds: 7
spring.cloud.gcp.pubsub.subscriber.retry.total-timeout-seconds: 3000

但这对消息再次出现的时间没有影响。我是否理解错误重试设置的含义?也许它们只有在存在一些连接问题的情况下才会生效,但在nacking或缺乏确认的情况下不会生效?还是在使用deploymentManager创建订阅时必须设置该设置,而不允许从代码中进行设置?或者在(开发)配置文件中设置它们可能无法使用PubSubAdmin?谢谢你的建议!

edit:我希望第一次重试在5秒后,但下一次重试在10秒后,等等。另外,我想设置最大重试次数。所以我不感兴趣的是将ackDeadline设置为一个更大的数字。

edit2:为什么nacking:其中一个服务(我们称之为桥接器)正在订阅消息,必须验证每个消息,如果可以,则将其传递给另一个外部系统。这个服务充当了这个系统的桥梁,因为我们不能直接在第二个系统上工作。在某些情况下,消息需要一些额外的信息,因此网桥会尝试在其他地方获取它(包括许多微服务),有时会发生这种情况,此时此刻还没有额外的信息。因此,第一个想法是不要确认消息,让它稍后再次出现。但我不想在接下来的7天里每10秒问一次(使用ackDeadline),我只想尝试几次,如果2小时后不出现,它就永远不会出现。所以我们尝试了nack,并希望这些重试设置可以帮助管理重新发送。但由于他们没有,我想唯一的办法就是自己在桥上建立一些管理这些信息的东西。也许可以存储消息ID和重试次数,这样我就可以在例如5次之后确认,并将消息推送到另一个主题以不同的方式处理它。或者有什么更好的解决方案?

Cloud Pub/Sub不为特定消息提供指数退避。除了告诉Cloud Pub/Sub您无法处理消息之外,nack没有任何作用。

如果你要记录你为什么需要窃听这些信息,我可以提供一个更有用的答案。如果您无法处理当前负载,可以使用此处描述的流控制选项来减少发送到客户端的未处理消息或字节数。如果你有已知不好的消息,你应该在推到另一个单独处理的死信主题后确认它们。

对编辑2的响应:

如果在这种情况下,补充消息的操作可能会失败,请在服务中实现您自己想要的任何回退机制。在构建订阅服务器时设置最大ack扩展期(java中的setMaxAckExtensionPeriod),以确保客户端将每条消息的ack截止日期延长到足以满足重试链的长度。

编辑2

请注意,Pub/Sub现在已经内置了对死信的支持。

您可以使用PubSubSubscriberTemplate.modifyAckDeadline()以编程方式延长通过pull检索的一批消息的截止日期。每个单独的AcknowledgeablePubsubMessage也有一个modifyAckDeadline()方法,如果您只需要为少数掉队者延长截止日期的话。

如果该特定订阅上的所有消息都需要更长的确认期,则可以通过编辑订阅并更新"确认截止日期"字段,在GCP控制台中设置默认值。

最新更新