NATS-JetStream:是否可以明确要求JetStream(重新)发送它在主题foo.*中收到的最后几条消息



主体所说的基本内容。

我想知道JetStream是否可以以允许我们重写主题"的最后15条消息的方式进行查询;foo.*";或者JetStream在主题"上接收到的消息;foo.*";在最后1.5秒内。

如果可能的话,任何代码样本或到代码样本的链接都会受到赞赏。

根据官方文件

  • 可以从某个时间开始抓取消息:在最后1.5秒内

DeliverByStartTime
首次消费邮件时,从该时间或之后的邮件开始。使用者需要指定OptStartTime,即流中开始的时间。它将在该时间当天或之后接收最接近的可用消息。

  • 其他要求,最后15条消息,我认为这是不可能的
  • 有一种方法可以在JetStream中实现与时间相关的检索。

    now := time.Now()
    oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500)
    js, _ := nc.JetStream()
    sub, err := js.SubscribeSync(
    "foo.*",
    nats.OrderedConsumer(),
    nats.StartTime(oneAndHalfSecondAgo),
    )
    for {
    msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones
    if err != nil {
    log.Fatal(err)
    }
    // 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here
    // 2. if the message is before now we can push it in an array here
    }
    

    请注意,这种技术虽然有用,但效率很低,因为我们一个接一个地获取消息。

    我们可以使用.Subscribe(((它是异步的(来修改它,但之后我们会遇到另一个问题:

    我们会在当前时刻从JetStream中过量提取,然后我们必须确保我们捕获的缓冲消息确实会返回给JetStream。据我所知,没有配置选项来告诉JetStream关于";MaxTime";。

  • 至于如何";获取最新的N消息";可以修改上面的代码示例,以便他将获得相当高的消息块(例如,最后5秒、10秒或30秒内的所有消息(,并且在获得到当前时刻的所有消息之后,他可以获取最新的"N"消息。

    当然,这种技术并不理想,但似乎并没有其他方法可以做到这一点——至少在撰写本文时没有。

最新更新