如何将渠道和等待组的工作结合起来

  • 本文关键字:工作 结合 起来 等待 go
  • 更新时间 :
  • 英文 :


有一个代码:

func main() {
rand.Seed(time.Now().UnixNano())
res, err := MockFunc()
fmt.Printf("res - %v, err - %vn", res, err)
}
func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
result := make([]int, 0)
errCh := make(chan error)
wg := &sync.WaitGroup{}
wg.Add(len(args))
for _, a := range args {
go func(aLit int) {
value, err := RandomError(aLit)
if err != nil {
errCh <- err
return
}
result = append(result, value)
}(a)
}
errValue := <-errCh // if no errors, i have panic - fatal error: all goroutines are asleep - deadlock!
if errValue != nil {
fmt.Println("returning because err")
return nil, errValue // if RandomError() returns error, returned here as expected
}
wg.Wait()
return result, nil
}
func RandomError(arg int) (int, error) {
time.Sleep(time.Millisecond * 100 * time.Duration(arg))
errChance := rand.Intn(100)
if errChance > 40 {
fmt.Printf("error on arg - %dn", arg)
return 0, errors.New("mock err")
}
return errChance, nil
}

我需要,如果RandomError()函数返回错误,那么MockFunc()函数在等待所有waitgroups之前完成并返回错误。但如果没有错误,那么我得到一个deadlock,如果有,那么一切都如预期。

我知道这是因为我没有关闭频道。但是,如果我在wg.Wait()之后关闭它,那么这将失去意义,因为如果对函数的第一次调用返回错误,那么我将等待所有其他调用的结果。

我需要它,这样,如果对RandomeErr()的某个调用返回错误,我就可以在不等待所有调用结束的情况下将此错误从MockFunc()返回到main()

使用errgroup.Group可能更容易。

我还必须创建另一个goroutine和通道来将结果读取到数组中。如果您在许多goroutine中使用result = append(result, value),您会在某个时刻出现错误。

使用errgroup.Group的好处之一是,在函数返回之前,必须清理并停止所有goroutine。当这些类型的函数是大型系统的一部分时,这有助于防止内存泄漏。

package main
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
rand.Seed(time.Now().UnixNano())
res, err := MockFunc()
fmt.Printf("res - %v, err - %vn", res, err)
}
func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
resultChan := make(chan int)
result := make([]int, 0)
eg, ctx := errgroup.WithContext(context.Background())
// Read all results into an array
eg.Go(func() error {
for {
select {
case <-ctx.Done():
// Always check to see if the context has cancelled,
// if there is an error errgroup will cancel the
// context and all goroutines will need to exit
// before `eg.Wait` returns.
return context.Canceled
case val := <-resultChan:
result = append(result, val)
if len(result) == len(args) {
return nil
}
}
}
})
for _, a := range args {
aLit := a // Copy the value so that we don't re-use the memory address
eg.Go(func() error {
value, err := RandomError(aLit)
if err != nil {
return err
}
select {
case resultChan <- value:
case <-ctx.Done():
return context.Canceled
default:
}
return nil
})
}
return result, eg.Wait()
}
func RandomError(arg int) (int, error) {
time.Sleep(time.Millisecond * 100 * time.Duration(arg))
errChance := rand.Intn(100)
if errChance > 40 {
fmt.Printf("error on arg - %dn", arg)
return 0, errors.New("mock err")
}
return errChance, nil
}

如果您将所有结果缓冲到一个通道中,也可以更简单:

func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
resultChan := make(chan int, len(args))
result := make([]int, 0)
eg, ctx := errgroup.WithContext(context.Background())
for _, a := range args {
aLit := a // Copy the value so that we don't re-use the memory address
eg.Go(func() error {
value, err := RandomError(aLit)
if err != nil {
return err
}
select {
case resultChan <- value:
case <-ctx.Done():
return context.Canceled
default:
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
close(resultChan)
for val := range resultChan {
result = append(result, val)
}
return result, nil
}

这里是我能想到的最简单的实现方式:


func MockFunc() (result []int, err error) {
args := []int{1, 2, 3, 4, 5}
resultChan := make(chan int)
errChan := make(chan error)
for _, a := range args {
go func(aLit int) {
value, err := RandomError(aLit)
if err != nil {
errChan <- err
}
resultChan <- value
}(a)
}
for i := 0; i < len(args); i++ {
select {
case err := <-errChan:
return nil, err
case val := <-resultChan:
result = append(result, val)
}
}
return result, nil
}

您有三个死锁:

  1. 错误值:=&lt-errCh//如果没有错误,我会出现panic-致命错误:所有goroutine都处于休眠状态-死锁
  2. 等待((#第44行
  3. errCh<-错误#第29行

由于return语句或第一个死锁,第二个事件从未发生。为了修复死锁1,我们必须使用非阻塞模式读取通道。为了修复死锁2,我们必须使用wg。在每个goroutine中完成。为了修复死锁3,我们必须使用缓冲通道,或者以某种方式使用该通道。为了简单起见,我选择了缓冲通道,但我们可以读取循环内部的errCh,如果看到错误,则返回错误。

package main
import (
"errors"
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
res, err := MockFunc()
fmt.Printf("res - %v, err - %vn", res, err)
}
func MockFunc() ([]int, error) {
args := []int{1, 2, 3, 4, 5}
result := make([]int, 0)
errCh := make(chan error, len(args))
wg := &sync.WaitGroup{}
wg.Add(len(args))
for _, a := range args {
go func(aLit int) {
defer wg.Done()
value, err := RandomError(aLit)
if err != nil {
errCh <- err
return
}
result = append(result, value)
}(a)
}
var errValue error
select {
case errValue = <-errCh: // if no errors, i have panic - fatal error: all goroutines are asleep - deadlock!
default:
}
if errValue != nil {
fmt.Println("returning because err")
return nil, errValue // if RandomError() returns error, returned here as expected
}
wg.Wait()
return result, nil
}
func RandomError(arg int) (int, error) {
time.Sleep(time.Millisecond * 100 * time.Duration(arg))
errChance := rand.Intn(100)
if errChance > 40 {
fmt.Printf("error on arg - %dn", arg)
return 0, errors.New("mock err")
}
return errChance, nil
}

最新更新