使用通道作为队列的死锁



我正在学习Go,并试图实现作业队列。

我想做的是:

让主goroutine通过一个通道为多个解析器工作者(将一行解析为s结构(提供提要行,并让每个解析器将结构发送到其他工作者(goroutine(将处理的结构通道(发送到数据库等(。

代码如下:

lineParseQ := make(chan string, 5)
jobProcessQ := make(chan myStruct, 5)
doneQ := make(chan myStruct, 5)
fileName := "myfile.csv"
file, err := os.Open(fileName)
if err != nil {
log.Fatal(err)
}
defer file.Close()
reader := bufio.NewReader(file)
// Start line parsing workers and send to jobProcessQ
for i := 1; i <= 2; i++ {
go lineToStructWorker(i, lineParseQ, jobProcessQ)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
lineCount := 0 
countSend := 0
for {
line, err := reader.ReadString('n')

if err != nil && err != io.EOF {
log.Fatal(err)
}

if err == io.EOF {
break
}

lineCount++

if lineCount > 1 {
countSend++
lineParseQ <- line[:len(line)-1]    // Avoid last char 'n'
}
}
for i := 0; i < countSend; i++ {
fmt.Printf("Received %+v.n", <-doneQ)
}
close(doneQ)
close(jobProcessQ)
close(lineParseQ)

这是一个简化的游乐场:https://play.golang.org/p/yz84g6CJraa

工人看起来是这样的:

func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct ) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
}

我知道这个问题与";完成";因为如果我不使用它,就没有错误,但我不知道如何修复它。

在完成将所有行发送到lineParseQ之前,您不会开始doneQ读取,这比缓冲区空间还多。因此,一旦doneQ缓冲区满了,就会发送块,开始填充lineParseQ缓冲区,一旦满了,它就会死锁。将发送到lineParseQ的循环、从doneQ读取的循环或两者移动到单独的goroutine,例如:

go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()

这在最后仍然会死锁,因为在同一个goroutine中,通道上有一个range,通道后面有close;由于range一直持续到通道关闭,而关闭发生在range完成之后,因此仍然存在死锁。你需要在适当的地方关闭;即,在发送例程中,或者如果给定信道有多个发送器,则在监视发送例程的WaitGroup上被阻止。

// Start line parsing workers and send to jobProcessQ
wg := new(sync.WaitGroup)
for i := 1; i <= 2; i++ {
wg.Add(1)
go lineToStructWorker(i, lineParseQ, jobProcessQ, wg)
}
// Process myStruct from jobProcessQ
for i := 1; i <= 5; i++ {
go WorkerProcessStruct(i, jobProcessQ, doneQ)
}
countSend := 0
go func() {
for _, line := range lines {
countSend++
lineParseQ <- line
}
close(lineParseQ)
}()
go func() {
wg.Wait()
close(jobProcessQ)
}()
for a := range doneQ {
fmt.Printf("Received %v.n", a)
}
// ...
func lineToStructWorker(workerID int, lineQ <-chan string, strQ chan<- myStruct, wg *sync.WaitGroup) {
for j := range lineQ {
strQ <- lineToStruct(j) // just parses the csv to a struct...
}
wg.Done()
}
func WorkerProcessStruct(workerID int, strQ <-chan myStruct, done chan<- myStruct) {
for a := range strQ {
time.Sleep(time.Millisecond * 500) // fake long operation...
done <- a
}
close(done)
}

完整的工作示例如下:https://play.golang.org/p/XsnewSZeb2X

sync.WaitGroup协调管道,将每个部件分为多个阶段。当您知道管道的一部分已经完成(并且没有人正在向特定通道写入(时,关闭通道以指示所有";工人";退出,例如

var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
i := i
wg.Add(1)
go func() {
Worker(i)
wg.Done()
}()
}
// wg.Wait() signals the above have completed

缓冲通道可以方便地处理突发工作负载,但有时在糟糕的设计中会用来避免死锁。如果你想避免在goroutine中运行管道的某些部分,你可以缓冲一些通道(通常与工作人员的数量相匹配(,以避免主goroutine堵塞。

如果你有相关的文章读&编写并希望避免死锁-确保它们在单独的goroutines中。拥有管道的所有部分,它自己的goroutine甚至可以消除对缓冲通道的需求:

// putting all channel work into separate goroutines
// removes the need for buffered channels
lineParseQ := make(chan string, 0)
jobProcessQ := make(chan myStruct, 0)
doneQ := make(chan myStruct, 0)

当然,这是一种权衡——goroutine的资源成本约为2K——而缓冲通道的资源成本要低得多。与大多数设计一样,这取决于它的使用方式。

此外,不要被臭名昭著的Go for循环抓住,所以使用闭包赋值来避免这种情况:

for i := 1; i <= 5; i++ {
i := i       // new i (not the i above)
go func() {
myfunc(i) // otherwise all goroutines will most likely get '5'
}()
}

最后,请确保在退出之前等待所有结果得到处理。从基于通道的函数返回并认为所有结果都已处理是一个常见的错误。在服务中,这最终将是真的。但在独立的可执行文件中,处理循环可能仍在处理结果。

go func() {
wgW.Wait()   // waiting on worker goroutines to finish
close(doneQ) // safe to close results channel now
}()
// ensure we don't return until all results have been processed
for a := range doneQ {
fmt.Printf("Received %v.n", a)
}

通过在主goroutine中处理结果,我们确保在没有处理完所有内容的情况下不会过早返回。

综合起来:

https://play.golang.org/p/MjLpQ5xglP3

最新更新