多个生产者,单一消费者:所有goroutine都处于休眠状态-死锁



在继续工作之前,我一直在遵循一种检查通道中是否有任何东西的模式:

func consume(msg <-chan message) {
for {
if m, ok := <-msg; ok {
fmt.Println("More messages:", m)
} else {
break
}
}
}

这是基于这个视频。这是我的完整代码:

package main
import (
"fmt"
"strconv"
"strings"
"sync"
)
type message struct {
body string
code int
}
var markets []string = []string{"BTC", "ETH", "LTC"}
// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
// for i := 0; i < n; i++ {
var msgToSend = message{
body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
code: 1,
}
fmt.Println("Producing:", msgToSend)
msg <- msgToSend
// }
wg.Done()
}
func receive(msg <-chan message, wg *sync.WaitGroup) {
for {
if m, ok := <-msg; ok {
fmt.Println("Received:", m)
} else {
fmt.Println("Breaking from receiving")
break
}
}
wg.Done()
}
func main() {
wg := sync.WaitGroup{}
msgC := make(chan message, 100)
defer func() {
close(msgC)
}()
for ix, market := range markets {
wg.Add(1)
go produce(ix+1, market, msgC, &wg)
}
wg.Add(1)
go receive(msgC, &wg)
wg.Wait()
}

如果你试图运行它,在我们打印出即将打破的消息之前,我们会在最后陷入僵局。这是有道理的,因为上一次,当chan中没有其他内容时,我们试图提取值,所以我们得到了这个错误。但是这种模式是不可行的if m, ok := <- msg; ok。我如何使这个代码工作&为什么我会得到这个死锁错误(大概这个模式应该工作吧?(。

考虑到一个通道上确实有多个写入程序,您会遇到一些挑战,因为在Go中,通常情况下,实现这一点的简单方法是在一个通道中有一个写入器,然后在发送最后一个数据时让该写入器关闭通道:

func produce(... args including channel) {
defer close(ch)
for stuff_to_produce {
ch <- item
}
}

这种模式有一个很好的特性,即无论你如何离开produce,通道都会关闭,这标志着生产的结束。

您没有使用这种模式——您将一个通道传递给多个goroutine,每个goroutine都可以发送一条消息——因此您需要移动close(当然,也可以使用其他模式(。表达您需要的模式的最简单方法是:

func overall_produce(... args including channel ...) {
var pg sync.WaitGroup
defer close(ch)
for stuff_to_produce {
pg.Add(1)
go produceInParallel(ch, &pg) // add more args if appropriate
}
pg.Wait()
}

pg计数器累加活动生产者。每个都必须调用pg.Done()以指示它是使用ch完成的。整个制作人现在等待所有这些操作完成,然后在退出时关闭频道。

(如果您将内部produceInParallel函数编写为闭包,则不需要显式地将chpg传递给它。您也可以将overallProducer编写为闭包。(

请注意,您的单个消费者循环可能最好使用for ... range构造来表达:

func receive(msg <-chan message, wg *sync.WaitGroup) {
for m := range msg {
fmt.Println("Received:", m)
}
wg.Done()
}

(你提到了在循环中添加一个select的意图,这样你就可以在消息还没有准备好的情况下进行其他计算。如果该代码不能分解成独立的goroutine,那么你实际上需要更高级的m, ok := <-msg结构。(

还要注意的是,receivewg与生产者的等待组pg完全独立,这可能是不必要的,具体取决于您如何构建其他内容。虽然正如所写的那样,在所有生产者完成之前,消费者是无法完成的,但我们希望独立等待生产者完成,这样我们就可以在整个生产者包装中关闭渠道。

试试这段代码,我做了一些修复:

package main
import (
"fmt"
"strconv"
"strings"
"sync"
)
type message struct {
body string
code int
}
var markets []string = []string{"BTC", "ETH", "LTC"}
// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
// for i := 0; i < n; i++ {
var msgToSend = message{
body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
code: 1,
}
fmt.Println("Producing:", msgToSend)
msg <- msgToSend
// }
}
func receive(msg <-chan message, wg *sync.WaitGroup) {
for {
if m, ok := <-msg; ok {
fmt.Println("Received:", m)
wg.Done()
}
}
}
func consume(msg <-chan message) {
for {
if m, ok := <-msg; ok {
fmt.Println("More messages:", m)
} else {
break
}
}
}
func main() {
wg := sync.WaitGroup{}
msgC := make(chan message, 100)
defer func() {
close(msgC)
}()
for ix, market := range markets {
wg.Add(1)
go produce(ix+1, market, msgC, &wg)
}
go receive(msgC, &wg)
wg.Wait()
fmt.Println("Breaking from receiving")
}

只有当main返回时,才可以close(msgC),但同时receive正在等待close信号,这就是发生DeadLock的原因。生成消息后,关闭通道。

package main
import (
"fmt"
"strconv"
"strings"
"sync"
)
type message struct {
body string
code int
}
var markets []string = []string{"BTC", "ETH", "LTC"}
// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
// for i := 0; i < n; i++ {
var msgToSend = message{
body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
code: 1,
}
fmt.Println("Producing:", msgToSend)
msg <- msgToSend
// }
wg.Done()
}
func receive(msg <-chan message, wg *sync.WaitGroup) {
for {
if m, ok := <-msg; ok {
fmt.Println("Received:", m)
} else {
fmt.Println("Breaking from receiving")
break
}
}
wg.Done()
}
func main() {
wg := sync.WaitGroup{}
msgC := make(chan message, 100)
// defer func() {
//  close(msgC)
// }()
for ix, market := range markets {
wg.Add(1)
go produce(ix+1, market, msgC, &wg)
}
wg.Wait() // wait for producer
close(msgC)
wg.Add(1)
go receive(msgC, &wg)
wg.Wait()
}

最新更新