等待组和同步在 Go 中不起作用



我一直在尝试goroutines和channels,我想测试WaitGroup功能。在这里,我尝试执行一个 HTTP 洪水作业,其中父线程生成大量 goroutine,这些 goroutines 将发出无限的请求,除非收到停止消息:

func (hf *HTTPFlood) Run() {
childrenStop := make(chan int, hf.ConcurrentCalls)
stop := false
totalRequests := 0
requestsChan := make(chan int)
totalErrors := 0
errorsChan := make(chan int)
var wg sync.WaitGroup
for i := 0; i < hf.ConcurrentCalls; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-childrenStop:
fmt.Printf("stop childn")
return
default:
_, err := Request(hf.Victim.String())
requestsChan <- 1
if err != nil {
errorsChan <- 1
}
}
}
}()
}
timeout := time.NewTimer(time.Duration(MaximumJobTime) * time.Second)
for !stop {
select {
case req := <- requestsChan:
totalReq += req
case err := <- errorsChan:
totalErrors += err
case <- timeout.C:
fmt.Printf("%s timed upn", hf.Victim.String())
for i := 0; i < hf.ConcurrentCalls; i++ {
childrenStop <- 1
}
close(childrenStop)
stop = true
break
}
}
fmt.Printf("waitingn")
wg.Wait()
fmt.Printf("after waitn")
close(requestsChan)
close(errorsChan)
fmt.Printf("endn")
}

触发超时后,父线程成功退出循环并到达 Wait 指令,但即使 stopChildren 通道已满,子 goroutines 似乎也永远不会在 stopChildren 通道上收到消息。

我错过了什么?

编辑:

因此,问题显然是如何管理通道及其发送/接收。 首先,在所有孩子收到消息之前,孩子停止频道就被关闭了。通道应在等待后关闭

另一方面,由于一旦父线程发送停止信号,就不会对请求Chan或错误Chan进行读取,因此大多数孩子在这两个通道上保持阻塞发送状态。我试图在 Wait 之前在循环之外的父线程中继续阅读,但这不起作用,所以我将实现切换到原子计数器,这似乎是管理此特定用例的更合适方法。

func (hf *HTTPFlood) Run() {
childrenStop := make(chan int, hf.ConcurrentCalls)
var totalReq uint64
var totalErrors uint64
var wg sync.WaitGroup
for i := 0; i < hf.ConcurrentCalls; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-childrenStop:
fmt.Printf("stop childn")
return
default:
_, err := Request(hf.Victim.String())
atomic.AddUint64(&totalReq, 1)
if err != nil {
atomic.AddUint64(&totalErrors, 1)
}
}
}
}()
}
timeout := time.NewTimer(time.Duration(MaximumJobTime) * time.Second)
<- timeout.C
fmt.Printf("%s timed upn", hf.Victim.String())
for i := 0; i < hf.ConcurrentCalls; i++ {
childrenStop <- 1
}
fmt.Printf("waitingn")
wg.Wait()
fmt.Printf("after waitn")
close(childrenStop)
fmt.Printf("endn")

}

您的 go 例程可能会在requestsChan <- 1被阻止。

case <- timeout.C:
fmt.Printf("%s timed upn", hf.Victim.String())
for i := 0; i < hf.ConcurrentCalls; i++ {
childrenStop <- 1
}
close(childrenStop)
stop = true
break

在这里,您向childrenStop发送一个数字,并期望 go 例程接收它。但是当你发送孩子停止信号时,你的例程可能会在requestsChan上发送一些东西。但是,当您在发送关闭信号后中断循环时,没有人监听要接收requestsChan

您可以通过在requestsChan <- 1之前和之后打印一些东西来确认行为来确认这一点。

当您在通道上发送内容而另一端没有人接收时,通道将阻塞

这是一个可能的修改。

package main
import (
"fmt"
"time"
)
func main() {
requestsChan := make(chan int)
done := make(chan chan bool)
for i := 0; i < 5; i++ {
go func(it int) {
for {
select {
case c := <-done:
c <- true
return
default:
requestsChan <- it
}
}
}(i)
}
max := time.NewTimer(1 * time.Millisecond)
allChildrenDone := make(chan bool)
childrenDone := 0
childDone := make(chan bool)
go func() {
for {
select {
case i := <-requestsChan:
fmt.Printf("received %d;", i)
case <-max.C:
fmt.Println("nTimeup")
for i := 0; i < 5; i++ {
go func() {
done <- childDone
fmt.Println("sent done")
}()
}
case <-childDone:
childrenDone++
fmt.Println("child done ", childrenDone)
if childrenDone == 5 {
allChildrenDone <- true
return
}
}
}
}()
fmt.Println("Waiting")
<-allChildrenDone
}

这里要注意的是,我在 go 例程中发送关闭信号,以便在我等待所有孩子干净退出时循环可以继续。

请观看Rob Pike的演讲,其中清楚地涵盖了这些细节。

[编辑]:以前的代码会导致退出后运行例程。

最新更新