我正在尝试实现一个简单的逻辑,其中Producer通过永久for
循环将数据发送到通道ch
,Consumer从通道ch
读取数据。
当生产者在信道quit
上接收到信号时,生产者停止生产并退出永久循环。
代码是这个(另见这个操场(
func main() {
ch := make(chan int)
quit := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go produce(ch, quit, &wg)
go consume(ch)
time.Sleep(1 * time.Millisecond)
fmt.Println("CLOSE")
close(quit)
wg.Wait()
}
func produce(ch chan int, quit chan bool, wg *sync.WaitGroup) {
for i := 0; ; i++ {
select {
case <-quit:
close(ch)
fmt.Println("exit")
wg.Done()
return //we exit
default:
ch <- i
fmt.Println("Producer sends", i)
}
}
}
func consume(ch chan int) {
for {
runtime.Gosched() // give the opportunity to the main goroutine to close the "quit" channel
select {
case i, more := <-ch:
if !more {
fmt.Println("exit consumer")
return
}
fmt.Println("Consumer receives", i)
}
}
}
如果我在我的机器(一台4核的Mac(上运行这段代码,一切都会很好。如果我在Go PlayGround上尝试相同的代码,它总是超时。我想这是因为Go Playground是一个单核,因此无限循环不会给其他goroutine运行的机会,但我不明白为什么runtime.Gosched()
指令没有任何效果。
为了完成我看到的画面,如果我在Mac上设置GOMAXPROCS=1
,程序仍然可以正常工作,并按预期退出。如果我在Mac上设置GOMAXPROCS=1
并删除runtime.Gosched()
指令,行为就会变得脆弱:有时程序会按预期终止,有时它似乎永远不会退出无限循环。
您创建了一个在实际程序中不应该发生的病态情况,因此调度程序没有进行优化以处理此问题。再加上操场上的假时间实现,在达到超时之前,生产者和消费者的循环太多了。
生产者goroutine正在尽可能快地创建价值,而消费者总是准备好接收它们。使用GOMAPXPROCS=1
,调度器在被迫抢占可用工作以检查主goroutine之前,会花费所有时间在两者之间来回切换,这比操场允许的时间要长。
如果我们为生产者-消费者对添加一些事情,我们可以限制他们独占调度器的时间。例如,向消费者添加time.Sleep(time.Microsecond)
将导致游乐场打印1000个值。这也表明了";准确的";模拟的时间是在操场上,因为这对于处理每条消息需要非零时间的普通硬件来说是不可能的。
虽然这是一个有趣的案例,但它与实际程序几乎没有关系。
需要注意的是,您可以通过通道range
接收所有值,如果可能,您应该始终在goroutine开始时发送defer wg.Done
,您可以在select
case
中发送值,这允许您在发送未准备好时实际取消for select循环,如果您想要;退出消费者";您还需要将WaitGroup
发送给消费者。
https://play.golang.org/p/WyPmpY9pFl7
func main() {
ch := make(chan int)
quit := make(chan bool)
var wg sync.WaitGroup
wg.Add(2)
go produce(ch, quit, &wg)
go consume(ch, &wg)
time.Sleep(50 * time.Microsecond)
fmt.Println("CLOSE")
close(quit)
wg.Wait()
}
func produce(ch chan int, quit chan bool, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; ; i++ {
select {
case <-quit:
close(ch)
fmt.Println("exit")
return
case ch <- i:
fmt.Println("Producer sends", i)
}
}
}
func consume(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for i := range ch {
fmt.Println("Consumer receives", i)
time.Sleep(time.Microsecond)
}
fmt.Println("exit consumer")
return
}