主体所说的基本内容。
我想知道JetStream是否可以以允许我们重写主题"的最后15条消息的方式进行查询;foo.*";或者JetStream在主题"上接收到的消息;foo.*";在最后1.5秒内。
如果可能的话,任何代码样本或到代码样本的链接都会受到赞赏。
根据官方文件
- 可以从某个时间开始抓取消息:在最后1.5秒内
DeliverByStartTime
首次消费邮件时,从该时间或之后的邮件开始。使用者需要指定OptStartTime,即流中开始的时间。它将在该时间当天或之后接收最接近的可用消息。
- 其他要求,最后15条消息,我认为这是不可能的
-
有一种方法可以在JetStream中实现与时间相关的检索。
now := time.Now() oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500) js, _ := nc.JetStream() sub, err := js.SubscribeSync( "foo.*", nats.OrderedConsumer(), nats.StartTime(oneAndHalfSecondAgo), ) for { msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones if err != nil { log.Fatal(err) } // 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here // 2. if the message is before now we can push it in an array here }
请注意,这种技术虽然有用,但效率很低,因为我们一个接一个地获取消息。
我们可以使用.Subscribe(((它是异步的(来修改它,但之后我们会遇到另一个问题:
我们会在当前时刻从JetStream中过量提取,然后我们必须确保我们捕获的缓冲消息确实会返回给JetStream。据我所知,没有配置选项来告诉JetStream关于";MaxTime";。
-
至于如何";获取最新的N消息";可以修改上面的代码示例,以便他将获得相当高的消息块(例如,最后5秒、10秒或30秒内的所有消息(,并且在获得到当前时刻的所有消息之后,他可以获取最新的"N"消息。
当然,这种技术并不理想,但似乎并没有其他方法可以做到这一点——至少在撰写本文时没有。