如何在此Go频道为空之前取消订阅?



我不明白为什么这段代码不工作:

Playground REPL: https://play.golang.org/p/4PrKFnaTeKp

2009/11/10 23:00:01 published message to 0 subscribers
2009/11/10 23:00:01 published message to 0 subscribers
2009/11/10 23:00:02 client 1 connected
2009/11/10 23:00:02 published message to 1 subscribers
2009/11/10 23:00:02 message received: a message for 1
2009/11/10 23:00:02 receivedMsgs: 1
2009/11/10 23:00:02 message received: a message for all
2009/11/10 23:00:02 receivedMsgs: 2
2009/11/10 23:00:02 published message to 1 subscribers
2009/11/10 23:00:03 published message to 1 subscribers
2009/11/10 23:00:03 message received: a message for 1
2009/11/10 23:00:03 receivedMsgs: 3
2009/11/10 23:00:03 message received: a message for all
2009/11/10 23:00:03 receivedMsgs: 4
2009/11/10 23:00:03 published message to 1 subscribers
2009/11/10 23:00:04 published message to 1 subscribers
2009/11/10 23:00:04 message received: a message for 1
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
main.main()
/tmp/sandbox948233627/prog.go:70 +0xfb
goroutine 6 [chan send]:
main.(*Broker).Publish(0xc000010240, 0x4c9815, 0x11, 0x0, 0x0)
/tmp/sandbox948233627/prog.go:121 +0x2af
main.main.func1(0xc000010240)
/tmp/sandbox948233627/prog.go:34 +0x91
created by main.main
/tmp/sandbox948233627/prog.go:30 +0xc7
goroutine 7 [semacquire]:
sync.runtime_SemacquireMutex(0xc000018054, 0x0, 0x1)
/usr/local/go-faketime/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc000018050)
/usr/local/go-faketime/src/sync/mutex.go:138 +0x105
sync.(*Mutex).Lock(...)
/usr/local/go-faketime/src/sync/mutex.go:81
main.(*Broker).Unsubscribe(0xc000010240, 0xc000062060)
/tmp/sandbox948233627/prog.go:93 +0x1c5
main.main.func2(0xc000010240)
/tmp/sandbox948233627/prog.go:61 +0x1a5
created by main.main
/tmp/sandbox948233627/prog.go:40 +0xf6
package main
import (
"fmt"
"log"
"sync"
"time"
)
type Event struct {
Message  string
Consumer string
}
func NewEvent(msg string, consumer string) Event {
return Event{
Message:  msg,
Consumer: consumer,
}
}
type Broker struct {
consumers map[chan Event]string
mtx       *sync.Mutex
}
func main() {
broker := NewBroker()
go func() {
for {
time.Sleep(time.Second * 1)
broker.Publish(NewEvent("a message for 1", "1"))
broker.Publish(NewEvent("a message for all", ""))
}
}()
time.Sleep(2 * time.Second)
go func() {
ch := broker.Subscribe("1")
receivedMsgs := 0
for {
msg := <-ch
//---> Here I'm sending message to client's browser
//if _, err := w.Write([]byte(fmt.Sprintf("data: %snn", msg))); err != nil {
//  log.Println(err)
//  return
//}
//---> Here I unsubscribe if error in w.Flush() AKA browser was closed
//if err := w.Flush(); err != nil {
//log.Println("browser closed")
//broker.Unsubscribe(ch)
//return
//}
log.Println("message received:", msg.Message)
if receivedMsgs > 3 {
broker.Unsubscribe(ch)
break
}
receivedMsgs++
log.Println("receivedMsgs:", receivedMsgs)
}
}()
select {}
}
func NewBroker() *Broker {
return &Broker{
consumers: make(map[chan Event]string),
mtx:       new(sync.Mutex),
}
}
func (b *Broker) Subscribe(id string) chan Event {
b.mtx.Lock()
defer b.mtx.Unlock()
c := make(chan Event)
b.consumers[c] = id
log.Println(fmt.Sprintf("client %s connected", id))
return c
}
func (b *Broker) Unsubscribe(c chan Event) {
b.mtx.Lock()
defer b.mtx.Unlock()
id := b.consumers[c]
close(c)
delete(b.consumers, c)
log.Printf("client %s killed, %d remainingn", id, len(b.consumers))
}
func (b *Broker) Publish(e Event) {
b.mtx.Lock()
defer b.mtx.Unlock()
pubMsg := 0
for s, id := range b.consumers {
if e.Consumer != "" {
// Push to specific consumer
if id == e.Consumer {
s <- e
pubMsg++
break
}
} else {
// Push to every consumer
e.Consumer = id
s <- e
// Reset unused consumer
e.Consumer = ""
pubMsg++
}
}
log.Printf("published message to %d subscribersn", pubMsg)
}

启动时:

  1. 等待2秒
  2. 订阅broker和
  3. 开始接收消息
  4. 在第三条消息之后,我想打破无限的for,因此go func,但我得到一个错误:

如果我改变这一行:

if receivedMsgs > 2 {

if receivedMsgs > 3 {

我认为问题是因为当我调用break时,通道ch中仍然有一条消息。

我说的对吗?

如何解决这个问题?

我的灵感来自https://gist.github.com/maestre3d/4a42e8fa552694f7c97c4811ce913e23

问题是空的选择。根据本网站的链接描述,当使用空select时,select语句将永远阻塞,因为没有可用的例程来提供任何数据。

为了解决这个问题,我添加了一个带有cancel的上下文,当我取消订阅时,我将停止底部选择并停止第一个goroutine https://play.golang.org/p/tZklz7iiwGd也,我删除了不必要的互斥,因为你正在使用通道,这应该是线程安全的。我还把msg一个接一个地执行,就好像我把它们都放在同一个例程上发送一样,这将导致并发,我可以将msg发送到一个关闭的通道。

最新更新