我得到了使用来自 RabbitMQ 队列的消息的代码。如果一段时间没有收到消息,它应该会失败。
msgs, err := ch.Consume(
c.ResponseQueue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
...
loop:
for timeout := time.After(time.Second); ; {
select {
case <-timeout:
log.Printf("Failed to receive response for action %+vn Payload: %+vnError: %+vn", action, body, err)
return errors.New("Failed to receive response for action")
default:
for d := range msgs {
if corrID == d.CorrelationId {
err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
if err != nil {
return err
}
ch.Ack(d.DeliveryTag, false)
break loop
}
}
}
}
我从 RabbitMQ 手册中获取了消费代码,并尝试了一些实现超时的建议。我知道如何在Java中做到这一点,但不能在Golang中重复它。
提前谢谢。
更新:
将选择更改为:
c1 := make(chan error, 1)
go func() {
for d := range msgs {
if corrID == d.CorrelationId {
err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
if err != nil {
c1 <- err
}
ch.Ack(d.DeliveryTag, false)
c1 <- nil
}
}
}()
select {
case <-time.After(defaultTimeout * time.Second):
log.Printf("Failed to receive response for action %+vn Payload: %+vnError: %+vn", action, body, err)
return errors.New("Failed to receive response in time for action")
case err := <-c1:
failOnError(err, "Failed to process response")
}
return err
现在它按预期工作 - 如果它没有收到带有正确 corellationId 的消息,它将因超时而失败。谢谢大家的帮助。
您的循环有一个包含 2 种情况的select
:超时和default
分支。进入循环后,超时不会触发,因此执行default
分支。
default
分支包含msgs
通道上的for range
,该通道不断从通道接收,直到通道关闭(并且已从通道接收所有值(。通常这不应该发生,因此不会重新访问超时情况(仅当发生某些错误并且msgs
已关闭时(。
相反,在循环中使用具有 2 种情况的select
,一种超时,另一种仅从msgs
接收单个值。如果收到消息,请重新启动超时。对于可重新启动的计时器,请使用time.Timer
。
timeout := time.Second
timer := time.NewTimer(timeout)
for {
select {
case <-timer.C:
fmt.Println("timeout, returning")
return
case msg := <-msgs:
fmt.Println("received message:", msg)
// Reset timer: it must be stopped first
// (and drain its channel if it reports false)
if !timer.Stop() {
<-timer.C
}
timer.Reset(timeout)
}
}
查看此 Go Playground 示例以查看它的实际效果。
请注意,如果在收到消息后不需要重置计时器,只需注释掉重置器代码即可。此外,如果不需要重置,time.After()
更简单:
timeout := time.After(time.Second)
for {
select {
case <-timeout:
fmt.Println("timeout, returning")
return
case msg := <-msgs:
fmt.Println("received message:", msg, time.Now())
}
}
在Go Playground上试试这个。
最后一点:如果在超时发生之前中断循环,后台的计时器不会立即释放(仅在超时发生时(。如果您经常需要此操作,则可以使用context.WithTimeout()
获取context.Context
和取消函数,您可以在返回之前立即调用该函数以释放计时器资源(最好是延迟(。
这是它的样子:
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
for {
select {
case <-ctx.Done():
fmt.Println("timeout, returning")
return
case msg := <-msgs:
fmt.Println("received message:", msg, time.Now())
}
}
在Go Playground上试试这个。
将选择更改为:
c1 := make(chan error, 1)
go func() {
for d := range msgs {
if corrID == d.CorrelationId {
err = json.Unmarshal([]byte(uncompress(d.Body)), &v)
if err != nil {
c1 <- err
}
ch.Ack(d.DeliveryTag, false)
c1 <- nil
}
}
}()
select {
case <-time.After(defaultTimeout * time.Second):
log.Printf("Failed to receive response for action %+vn Payload: %+vnError: %+vn", action, body, err)
return errors.New("Failed to receive response in time for action")
case err := <-c1:
failOnError(err, "Failed to process response")
}
return err