我正在开发一个并发的 Go 库,我偶然发现了结果相似的 goroutines 之间的两种不同的同步模式:
候补组
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func main() {
words := []string{"foo", "bar", "baz"}
for _, word := range words {
wg.Add(1)
go func(word string) {
time.Sleep(1 * time.Second)
defer wg.Done()
fmt.Println(word)
}(word)
}
// do concurrent things here
// blocks/waits for waitgroup
wg.Wait()
}
渠道
package main
import (
"fmt"
"time"
)
func main() {
words := []string{"foo", "bar", "baz"}
done := make(chan bool)
// defer close(done)
for _, word := range words {
// fmt.Println(len(done), cap(done))
go func(word string) {
time.Sleep(1 * time.Second)
fmt.Println(word)
done <- true
}(word)
}
// Do concurrent things here
// This blocks and waits for signal from channel
for range words {
<-done
}
}
有人告诉我sync.WaitGroup
性能稍高一些,而且我已经看到它被普遍使用。但是,我发现频道更惯用。使用sync.WaitGroup
而不是渠道的真正优势是什么和/或当它更好时可能是什么情况?
与第二个示例的正确性无关(如评论中所述,您没有按照自己的想法做,但它很容易修复),我倾向于认为第一个示例更容易掌握。
现在,我什至不会说频道更惯用。通道是 Go 语言的标志性特征并不意味着尽可能使用它们是惯用的。Go 中的惯用语是使用最简单、最容易理解的解决方案:在这里,WaitGroup
传达含义(您的主要功能是为工人完成Wait
)和机制(工人在Done
时通知)。
除非您处于非常特殊的情况下,否则我不建议您在此处使用渠道解决方案。
对于您的简单示例(表示作业完成),WaitGroup
是显而易见的选择。Go 编译器非常友善,不会责怪您使用通道来简单发出完成任务的信号,但一些代码审阅者会这样做。
- "等待组等待一组 goroutines 完成。主要的goroutine调用
Add(n)
来设置数量等待的套路。然后每个goroutines完成后运行和调用Done()
。与此同时,等待可以用来阻止,直到所有 goroutines 都完成。
words := []string{"foo", "bar", "baz"}
var wg sync.WaitGroup
for _, word := range words {
wg.Add(1)
go func(word string) {
defer wg.Done()
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
}(word)
}
wg.Wait()
<小时 />可能性仅受您的想象力限制:
- 通道可以缓冲:
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, len(words))
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done <- struct{}{} // not blocking
}(word)
}
for range words {
<-done
}
- 通道
- 可以是无缓冲的,您可以只使用信令通道(例如
chan struct{}
):
words := []string{"foo", "bar", "baz"}
done := make(chan struct{})
for _, word := range words {
go func(word string) {
time.Sleep(100 * time.Millisecond) // a job
fmt.Println(word)
done <- struct{}{} // blocking
}(word)
}
for range words {
<-done
}
- 您可以限制具有缓冲通道容量的并发作业数:
t0 := time.Now()
var wg sync.WaitGroup
words := []string{"foo", "bar", "baz"}
done := make(chan struct{}, 1) // set the number of concurrent job here
for _, word := range words {
wg.Add(1)
go func(word string) {
done <- struct{}{}
time.Sleep(100 * time.Millisecond) // job
fmt.Println(word, time.Since(t0))
<-done
wg.Done()
}(word)
}
wg.Wait()
- 您可以使用以下渠道发送消息:
done := make(chan string)
go func() {
for _, word := range []string{"foo", "bar", "baz"} {
done <- word
}
close(done)
}()
for word := range done {
fmt.Println(word)
}
基准:
go test -benchmem -bench . -args -n 0
# BenchmarkEvenWaitgroup-8 1827517 652 ns/op 0 B/op 0 allocs/op
# BenchmarkEvenChannel-8 1000000 2373 ns/op 520 B/op 1 allocs/op
go test -benchmem -bench .
# BenchmarkEvenWaitgroup-8 1770260 678 ns/op 0 B/op 0 allocs/op
# BenchmarkEvenChannel-8 1560124 1249 ns/op 158 B/op 0 allocs/op
代码( main_test.go
):
package main
import (
"flag"
"fmt"
"os"
"sync"
"testing"
)
func BenchmarkEvenWaitgroup(b *testing.B) {
evenWaitgroup(b.N)
}
func BenchmarkEvenChannel(b *testing.B) {
evenChannel(b.N)
}
func evenWaitgroup(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i < n; i++ {
wg.Add(1)
go func(n int) {
select {
case ch <- n: // tx if channel is empty
case i := <-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
wg.Done()
}(i)
}
wg.Wait()
}
func evenChannel(n int) {
if n%2 == 1 { // make it even:
n++
}
for i := 0; i < n; i++ {
go func(n int) {
select {
case ch <- n: // tx if channel is empty
case i := <-ch: // rx if channel is not empty
// fmt.Println(n, i)
_ = i
}
done <- struct{}{}
}(i)
}
for i := 0; i < n; i++ {
<-done
}
}
func TestMain(m *testing.M) {
var n int // We use TestMain to set up the done channel.
flag.IntVar(&n, "n", 1_000_000, "chan cap")
flag.Parse()
done = make(chan struct{}, n)
fmt.Println("n=", n)
os.Exit(m.Run())
}
var (
done chan struct{}
ch = make(chan int)
wg sync.WaitGroup
)
这取决于用例。 如果要调度要并行运行的一次性作业,而无需知道每个作业的结果,则可以使用 WaitGroup
。 但是,如果您需要从goroutines收集结果,那么您应该使用通道。
由于频道是双向的,所以我几乎总是使用频道。
另一方面,正如评论中指出的那样,您的频道示例未正确实现。 您需要一个单独的渠道来指示没有更多工作要做(此处有一个例子)。 在您的情况下,由于您事先知道单词的数量,因此您可以只使用一个缓冲通道并接收固定次数以避免声明关闭通道。
如果你对只使用通道特别粘人,那么就需要以不同的方式完成(如果我们使用你的示例,正如@Not_a_Golfer指出的那样,它会产生不正确的结果)。
一种方法是制作一个 int 类型的通道。在工作进程中,每次完成作业时发送一个数字(这也可以是唯一的作业 ID,如果需要,可以在接收器中跟踪它)。
在接收器主 go 例程中(它将知道提交的作业的确切数量) - 在通道上执行范围循环,直到提交的作业数量未完成,并在所有作业完成时脱离循环。如果您想跟踪每个作业的完成情况(如果需要,可以做一些事情),这是一个很好的方法。
这是供您参考的代码。减少 totalJobsLeft 将是安全的,因为它只会在通道的范围循环中完成!
//This is just an illustration of how to sync completion of multiple jobs using a channel
//A better way many a times might be to use wait groups
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
comChannel := make(chan int)
words := []string{"foo", "bar", "baz"}
totalJobsLeft := len(words)
//We know how many jobs are being sent
for j, word := range words {
jobId := j + 1
go func(word string, jobId int) {
fmt.Println("Job ID:", jobId, "Word:", word)
//Do some work here, maybe call functions that you need
//For emulating this - Sleep for a random time upto 5 seconds
randInt := rand.Intn(5)
//fmt.Println("Got random number", randInt)
time.Sleep(time.Duration(randInt) * time.Second)
comChannel <- jobId
}(word, jobId)
}
for j := range comChannel {
fmt.Println("Got job ID", j)
totalJobsLeft--
fmt.Println("Total jobs left", totalJobsLeft)
if totalJobsLeft == 0 {
break
}
}
fmt.Println("Closing communication channel. All jobs completed!")
close(comChannel)
}
我经常使用通道从 goroutines 收集可能产生错误的错误消息。下面是一个简单的示例:
func couldGoWrong() (err error) {
errorChannel := make(chan error, 3)
// start a go routine
go func() (err error) {
defer func() { errorChannel <- err }()
for c := 0; c < 10; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// start another go routine
go func() (err error) {
defer func() { errorChannel <- err }()
for c := 10; c < 100; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// start yet another go routine
go func() (err error) {
defer func() { errorChannel <- err }()
for c := 100; c < 1000; c++ {
_, err = fmt.Println(c)
if err != nil {
return
}
}
return
}()
// synchronize go routines and collect errors here
for c := 0; c < cap(errorChannel); c++ {
err = <-errorChannel
if err != nil {
return
}
}
return
}
这里已经有很好的答案,频道并不总是惯用语。例如,当实现工作线程池时,使用等待组会更清楚。
另外,请注意,您的通道实现不正确,因为它在第一次进入而不是最后一次进入后退出。
我决定修复它:
package main
import (
"fmt"
"time"
)
func main() {
words := []string{"foo", "bar", "baz", "fax", "bor", "far"}
workersCount := len(words)
workersChan := make(chan bool, workersCount)
for _, word := range words {
go func(word string) {
time.Sleep(1 * time.Second)
fmt.Println(word)
workersChan <- true
}(word)
}
for i := 0; i != workersCount; i++ {
<-workersChan
}
}
还建议使用waitgroup,但你仍然想用频道来做,那么下面我提到频道的简单用法
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan string)
words := []string{"foo", "bar", "baz"}
go printWordrs(words, c)
for j := range c {
fmt.Println(j)
}
}
func printWordrs(words []string, c chan string) {
defer close(c)
for _, word := range words {
time.Sleep(1 * time.Second)
c <- word
}
}