Golang 并发问题引入超时



我希望使用 go 例程在 golang 中实现并行 api 调用。一旦请求被触发,

  1. 我需要等待所有回复(需要不同的时间)。
  2. 如果任何请求失败并返回错误,我希望结束(或假装)例程。
  3. 我还希望有一个与每个 go 例程(或 api 调用)关联的超时值。

我已经为 1 和 2 实现了以下内容,但需要有关如何实现 3 的帮助。此外,对 1 和 2 的反馈也会有所帮助。

package main
import (
"errors"
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
c := make(chan interface{}, 1)
c2 := make(chan interface{}, 1)
err := make(chan interface{})
wg.Add(1)
go func() {
defer wg.Done()
result, e := doSomeWork()
if e != nil {
err <- e
return
}
c <- result
}()
wg.Add(1)
go func() {
defer wg.Done()
result2, e := doSomeWork2()
if e != nil {
err <- e
return
}
c2 <- result2
}()
go func() {
wg.Wait()
close(c)
close(c2)
close(err)
}()
for e := range err {
// here error happend u could exit your caller function
fmt.Println("Error==>", e)
return
}
fmt.Println(<-c, <-c2)
}
// mimic api call 1
func doSomeWork() (function1, error) {
time.Sleep(10 * time.Second)
obj := function1{"ABC", "29"}
return obj, nil
}
type function1 struct {
Name string
Age  string
}
// mimic api call 2
func doSomeWork2() (function2, error) {
time.Sleep(4 * time.Second)
r := errors.New("Error Occured")
if 1 == 2 {
fmt.Println(r)
}
obj := function2{"Delhi", "Delhi"}
// return error as nil for now
return obj, nil
}
type function2 struct {
City  string
State string
}

提前谢谢。

这种分叉和连接模式正是golang.org/x/sync/errgroup的设计目的。(从一组 goroutines 中识别出适当的"第一个错误"可能非常微妙。

您可以使用errgroup.WithContext获取一个context.Context,如果组中的任何 goroutines 返回,该将被取消。(*Group).Wait方法等待 goroutines 完成并返回第一个错误。

对于您的示例,这可能看起来像:https://play.golang.org/p/jqYeb4chHCZ。


然后,您可以通过使用context.WithTimeout包装Context,在任何给定调用中注入超时。

(但是,根据我的经验,如果您正确地尝试了取消,则显式超时几乎永远不会有用 - 如果最终用户厌倦了等待,他们可以显式取消,并且您可能不希望将降级的服务提升为完全中断如果某些事情开始花费的时间比您预期的要长一点。

为了支持超时和取消 goroutine 工作,标准机制是使用上下文。上下文。

ctx := context.Background() // root context
// wrap the context with a timeout and/or cancelation mechanism
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // with timeout or cancel
//ctx, cancel := context.WithCancel(ctx)               // no   timeout just cancel
defer cancel() // avoid memory leak if we never cancel/timeout

接下来,您的工作线程 goroutines 需要支持获取和监视ctx的状态。要与time.Sleep并行执行此操作(以模拟长计算),请将睡眠转换为基于通道的解决方案:

// mimic api call 1
func doSomeWork(ctx context.Context) (function1, error) {
//time.Sleep(10 * time.Second)
select {
case <-time.After(10 * time.Second):
// wait completed
case <-ctx.Done():
return function1{}, ctx.Err()
}
// ...
}

如果一个工作线程 goroutine 失败,要向另一个工作线程发出请求应该中止的信号,只需调用cancel()函数即可。

result, e := doSomeWork(ctx)
if e != nil {
cancel()    // <- add this
err <- e
return
}

把这一切放在一起:

https://play.golang.org/p/1Kpe_tre7XI


EDIT:上面的睡眠示例显然是一个如何中止"假"任务的人为示例。在现实世界中,将涉及httpSQL DB调用 - 并且由于 go1.7&1.8- 标准库为这些可能阻止的调用中的任何一个添加了上下文支持:

func doSomeWork(ctx context.Context) (error) 
// DB
db, err := sql.Open("mysql", "...") // check err
//rows, err := db.Query("SELECT age from users", age)
rows, err := db.QueryContext(ctx, "SELECT age from users", age)
if err != nil {
return err // will return with error if context is canceled
}
// http
// req, err := http.NewRequest("GET", "http://example.com", nil)
req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil) // check err
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err // will return with error if context is canceled
}
}

EDIT (2):要在不阻塞的情况下轮询上下文的状态,请利用selectdefault分支:

select {
case <-ctx.Done():
return ctx.Err()
default:
// if ctx is not done - this branch is used
}

default分支可以选择包含代码,但即使它没有代码,它的存在也会阻止阻塞 - 因此只是在那一刻轮询上下文的状态。

最新更新