在继续工作之前,我一直在遵循一种检查通道中是否有任何东西的模式:
func consume(msg <-chan message) {
for {
if m, ok := <-msg; ok {
fmt.Println("More messages:", m)
} else {
break
}
}
}
这是基于这个视频。这是我的完整代码:
package main
import (
"fmt"
"strconv"
"strings"
"sync"
)
type message struct {
body string
code int
}
var markets []string = []string{"BTC", "ETH", "LTC"}
// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
// for i := 0; i < n; i++ {
var msgToSend = message{
body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
code: 1,
}
fmt.Println("Producing:", msgToSend)
msg <- msgToSend
// }
wg.Done()
}
func receive(msg <-chan message, wg *sync.WaitGroup) {
for {
if m, ok := <-msg; ok {
fmt.Println("Received:", m)
} else {
fmt.Println("Breaking from receiving")
break
}
}
wg.Done()
}
func main() {
wg := sync.WaitGroup{}
msgC := make(chan message, 100)
defer func() {
close(msgC)
}()
for ix, market := range markets {
wg.Add(1)
go produce(ix+1, market, msgC, &wg)
}
wg.Add(1)
go receive(msgC, &wg)
wg.Wait()
}
如果你试图运行它,在我们打印出即将打破的消息之前,我们会在最后陷入僵局。这是有道理的,因为上一次,当chan中没有其他内容时,我们试图提取值,所以我们得到了这个错误。但是这种模式是不可行的if m, ok := <- msg; ok
。我如何使这个代码工作&为什么我会得到这个死锁错误(大概这个模式应该工作吧?(。
考虑到一个通道上确实有多个写入程序,您会遇到一些挑战,因为在Go中,通常情况下,实现这一点的简单方法是在一个通道中有一个写入器,然后在发送最后一个数据时让该写入器关闭通道:
func produce(... args including channel) {
defer close(ch)
for stuff_to_produce {
ch <- item
}
}
这种模式有一个很好的特性,即无论你如何离开produce
,通道都会关闭,这标志着生产的结束。
您没有使用这种模式——您将一个通道传递给多个goroutine,每个goroutine都可以发送一条消息——因此您需要移动close
(当然,也可以使用其他模式(。表达您需要的模式的最简单方法是:
func overall_produce(... args including channel ...) {
var pg sync.WaitGroup
defer close(ch)
for stuff_to_produce {
pg.Add(1)
go produceInParallel(ch, &pg) // add more args if appropriate
}
pg.Wait()
}
pg
计数器累加活动生产者。每个都必须调用pg.Done()
以指示它是使用ch
完成的。整个制作人现在等待所有这些操作完成,然后它在退出时关闭频道。
(如果您将内部produceInParallel
函数编写为闭包,则不需要显式地将ch
和pg
传递给它。您也可以将overallProducer
编写为闭包。(
请注意,您的单个消费者循环可能最好使用for ... range
构造来表达:
func receive(msg <-chan message, wg *sync.WaitGroup) {
for m := range msg {
fmt.Println("Received:", m)
}
wg.Done()
}
(你提到了在循环中添加一个select
的意图,这样你就可以在消息还没有准备好的情况下进行其他计算。如果该代码不能分解成独立的goroutine,那么你实际上需要更高级的m, ok := <-msg
结构。(
还要注意的是,receive
的wg
与生产者的等待组pg
完全独立,这可能是不必要的,具体取决于您如何构建其他内容。虽然正如所写的那样,在所有生产者完成之前,消费者是无法完成的,但我们希望独立等待生产者完成,这样我们就可以在整个生产者包装中关闭渠道。
试试这段代码,我做了一些修复:
package main
import (
"fmt"
"strconv"
"strings"
"sync"
)
type message struct {
body string
code int
}
var markets []string = []string{"BTC", "ETH", "LTC"}
// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
// for i := 0; i < n; i++ {
var msgToSend = message{
body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
code: 1,
}
fmt.Println("Producing:", msgToSend)
msg <- msgToSend
// }
}
func receive(msg <-chan message, wg *sync.WaitGroup) {
for {
if m, ok := <-msg; ok {
fmt.Println("Received:", m)
wg.Done()
}
}
}
func consume(msg <-chan message) {
for {
if m, ok := <-msg; ok {
fmt.Println("More messages:", m)
} else {
break
}
}
}
func main() {
wg := sync.WaitGroup{}
msgC := make(chan message, 100)
defer func() {
close(msgC)
}()
for ix, market := range markets {
wg.Add(1)
go produce(ix+1, market, msgC, &wg)
}
go receive(msgC, &wg)
wg.Wait()
fmt.Println("Breaking from receiving")
}
只有当main
返回时,才可以close(msgC)
,但同时receive
正在等待close
信号,这就是发生DeadLock的原因。生成消息后,关闭通道。
package main
import (
"fmt"
"strconv"
"strings"
"sync"
)
type message struct {
body string
code int
}
var markets []string = []string{"BTC", "ETH", "LTC"}
// produces messages into the chan
func produce(n int, market string, msg chan<- message, wg *sync.WaitGroup) {
// for i := 0; i < n; i++ {
var msgToSend = message{
body: strings.Join([]string{"market: ", market, ", #", strconv.Itoa(1)}, ""),
code: 1,
}
fmt.Println("Producing:", msgToSend)
msg <- msgToSend
// }
wg.Done()
}
func receive(msg <-chan message, wg *sync.WaitGroup) {
for {
if m, ok := <-msg; ok {
fmt.Println("Received:", m)
} else {
fmt.Println("Breaking from receiving")
break
}
}
wg.Done()
}
func main() {
wg := sync.WaitGroup{}
msgC := make(chan message, 100)
// defer func() {
// close(msgC)
// }()
for ix, market := range markets {
wg.Add(1)
go produce(ix+1, market, msgC, &wg)
}
wg.Wait() // wait for producer
close(msgC)
wg.Add(1)
go receive(msgC, &wg)
wg.Wait()
}