从goroutine通道读取而不阻塞



我有两个goroutine:主worker和一个helper,为了获得一些帮助,它会从中派生出来。helper可能会遇到错误,所以我使用一个信道将错误从helper传递到worker

func helper(c chan <- error) (){
//do some work
c <- err // send errors/nil on c
}

以下是helper()的名称:

func worker() error {
//do some work
c := make(chan error, 1)
go helper(c)
err := <- c
return err
}

问题:

  • 语句err := <- c是否阻塞worker?我不这么认为,因为通道是缓冲的。

  • 如果它是阻塞的,我如何使它成为非阻塞的?我的要求是让worker及其调用方继续剩下的工作,而不需要等待值出现在通道上。

谢谢。

您可以轻松验证

func helper(c chan<- error) {
time.Sleep(5 * time.Second)
c <- errors.New("") // send errors/nil on c
}
func worker() error {
fmt.Println("do one")
c := make(chan error, 1)
go helper(c)
err := <-c
fmt.Println("do two")
return err
}
func main() {
worker()
}

Q:语句是否为err:=<-c阻塞工人?我不这么认为,因为通道是缓冲的。

A:err := <- c将阻止工作进程。

Q:如果它是阻塞的,我如何使它成为非阻塞的?我的要求是让worker和它的调用者继续剩下的工作,而不需要等待值出现在通道上。

A:如果您不想阻止,只需删除err := <-c即可。如果最后需要出错,只需将err := <-c移到最后即可。

您不能在没有阻塞的情况下读取通道,如果您在没有阻塞情况下通过,则不能再执行此代码,除非您的代码处于循环中。

Loop:
for {
select {
case <-c:
break Loop
default:
//default will go through without blocking
}
// do something
}

你见过errgroup或waitgroup吗

它使用原子、取消上下文和同步。一次来实现这一点。

https://github.com/golang/sync/blob/master/errgroup/errgroup.go

https://github.com/golang/go/blob/master/src/sync/waitgroup.go

或者你可以直接使用它,执行函数,然后在任何你想要的地方等待错误。

在您的代码中,其余的工作与助手是否遇到错误无关。您只需在完成其余工作后从频道接收即可。

func worker() error {
//do some work
c := make(chan error, 1)
go helper(c)
//do rest of the work
return <-c
}

我想你需要这个代码。。

运行此代码

package main
import (
"log"
"sync"
)
func helper(c chan<- error) {
for {
var err error = nil
// do job
if err != nil {
c <- err // send errors/nil on c
break
}
}
}
func worker(c chan error) error {
log.Println("first log")
go func() {
helper(c)
}()
count := 1
Loop:
for {
select {
case err := <- c :
return err
default:
log.Println(count, " log")
count++
isFinished := false
// do your job
if isFinished {
break Loop // remove this when you test
}
}
}
return nil
}
func main() {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
c := make(chan error, 1)
worker(c)
wg.Done()
}()
wg.Wait()
}

以非阻塞方式从通道读取的功能方式:

func CollectChanOne[T any](ch <-chan T) (T, bool) {
select {
case val, stillOpen := <-ch:
return val, stillOpen
default:
var zeroT T
return zeroT, false
}
}

func worker() error {
//do some work
c := make(chan error, 1)
go helper(c)
err, _ := CollectChanOne(c) // Wont block worker
return err
}

示例:https://go.dev/play/p/Njwyt32B4oT

注意:这个例子还有另一个方法CollectChanRemaining((,它读取通道中所有缓冲的元素。

最新更新