延迟呼叫同步.Goroutine中的WaitGroup.Wait():为什么要这样做?



我正在尝试了解vegeta负载测试工具/库源代码中的Attack()函数(https://github.com/tsenart/vegeta/blob/44a49c878dd6f28f04b9b5ce5751490b0dce1e18/lib/attack.go#L253-L312)。我创建了一个简化的示例:

package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go attack(&wg)
}
// wg.Wait()
go func() {
defer wg.Wait()
}()
}
func attack(wg *sync.WaitGroup) {
defer wg.Done()
time.Sleep(1 * time.Second)
fmt.Println("foobar")
}

我注意到的是,此函数立即返回而不打印foobar10 次。只有当行中的注释wg.Wait()我看到foobar10 秒后打印 1 次。这对我来说很有意义,因为main()函数在调用wg.Wait()之前返回。

那么,我不明白的是Attack()方法在vegeta中是如何工作的,因为它似乎遵循类似的模式:

func (a *Attacker) Attack(tr Targeter, p Pacer, du time.Duration, name string) <-chan *Result {
var wg sync.WaitGroup
workers := a.workers
if workers > a.maxWorkers {
workers = a.maxWorkers
}
results := make(chan *Result)
ticks := make(chan struct{})
for i := uint64(0); i < workers; i++ {
wg.Add(1)
go a.attack(tr, name, &wg, ticks, results)
}
go func() {
defer close(results)
defer wg.Wait()
defer close(ticks)
began, count := time.Now(), uint64(0)
for {
elapsed := time.Since(began)
if du > 0 && elapsed > du {
return
}
wait, stop := p.Pace(elapsed, count)
if stop {
return
}
time.Sleep(wait)
if workers < a.maxWorkers {
select {
case ticks <- struct{}{}:
count++
continue
case <-a.stopch:
return
default:
// all workers are blocked. start one more and try again
workers++
wg.Add(1)
go a.attack(tr, name, &wg, ticks, results)
}
}
select {
case ticks <- struct{}{}:
count++
case <-a.stopch:
return
}
}
}()
return results
}

其中attack()方法读取

func (a *Attacker) attack(tr Targeter, name string, workers *sync.WaitGroup, ticks <-chan struct{}, results chan<- *Result) {
defer workers.Done()
for range ticks {
results <- a.hit(tr, name)
}
}

我不明白为什么Attack()函数不会在不调用attack()的情况下立即返回,因为它的wg.Wait()在 Goroutine 中?

贝吉塔的Attack也会立即返回,但有一个通道,该通道由仍在运行的 goroutines 填充。 一旦这些完成,通道就会关闭(defer close(results)),使代码能够result来检测完成。

例;

package main
import (
"fmt"
"sync"
"time"
)
func main() {
results := attacks()
fmt.Println("attacks returned")
for result := range results {
fmt.Println(result)
}
}
func attacks() chan string {
// A channel to hold the results
c := make(chan string)
// Fire 10 routines populating the channel
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
attack(c)
wg.Done()
}()
}
// Close channel once routines are finished
go func() {
wg.Wait()
close(c)
}()
//
return c
}
func attack(c chan<- string) {
time.Sleep(1 * time.Second)
c <- "foobar"
}

相关内容

最新更新