当我返回多个错误时,go例程会感到恐慌



我正在处理工作池,我想在返回错误之前合并工作池中的所有错误。我已经编写了一个示例代码,但我正进入死锁状态。

我想实现什么?一个客户端发送100个请求,我想首先将这些请求添加到一个作业队列中,并将其分派到n个在后台执行任务的go例程中,如果有错误,我想在将所有错误发送到客户端之前累积所有这些错误。我写了一个片段,有人能解释出了什么问题以及如何缓解僵局吗。

package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/apex/log"
"github.com/hashicorp/go-multierror"
)
type Manager struct {
taskChan    chan int
wg          *sync.WaitGroup
QuitChan    chan bool
ErrorChan   chan error
busyWorkers int64
}
func main() {
fmt.Println("Hello, 世界")
m := New()
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
//defer cancel()
for i := 0; i < 3; i++ {
m.wg.Add(1)
go m.run(ctx, test)
}
for i := 1; i < 5; i++ {
m.taskChan <- i
}
close(m.taskChan)
go func(*Manager) {
if len(m.taskChan) == 0 {
m.QuitChan <- true
}
}(m)
var errors error
for {
select {
case err := <-m.ErrorChan:
errors = multierror.Append(errors, err)
if m.busyWorkers == int64(0) {
break
}
default:
fmt.Println("hello")
}
}
m.wg.Wait()
fmt.Println(errors)
}
func New() *Manager {
return &Manager{taskChan: make(chan int),
wg:        new(sync.WaitGroup),
QuitChan:  make(chan bool),
ErrorChan: make(chan error),
}
}
func (m *Manager) run(ctx context.Context, fn func(a, b int) error) {
defer m.wg.Done()
defer fmt.Println("finished working")
for {
select {
case t, ok := <-m.taskChan:
if ok {
atomic.AddInt64(&m.busyWorkers, 1)
err := fn(t, t)
if err != nil {
m.ErrorChan <- err
}
atomic.AddInt64(&m.busyWorkers, -1)
}
case <-ctx.Done():
log.Infof("closing channel %v", ctx.Err())
return
case <-m.QuitChan:
return
}
}
}
// this can return error or not, this is the main driver func, but i'm propagating 
//errors so that i can understand where i am going wrong
func test(a, b int) error {
fmt.Println(a, b)
return fmt.Errorf("dummy error %v", a)
}

您有3个工人,他们都返回错误。

您的主线程尝试将5个作业放入队列中。一旦前3个工作线程被您的工作线程占用,主线程就会在taskChan上等待新的工作线程接收,而您的所有3个工作进程都会在ErrorChan上尝试发送数据。

换句话说,就是死锁。

也许你想让taskChan成为一个缓冲通道?这样,您就可以在上面发送数据,直到缓冲区满为止,而不会阻塞。

taskChan: make(chan int, 10)

最新更新