如何在Go中使用sync/errgroup包编写并发for循环



我想并发地对切片的元素执行操作
我正在使用sync/errgroup包来处理并发性

这是Go Playground上的最小复制https://go.dev/play/p/yBCiy8UW_80

import (
"fmt"
"golang.org/x/sync/errgroup"
)
func main() {
eg := errgroup.Group{}
input := []int{0, 1, 2}
output1 := []int{}
output2 := make([]int, len(input))
for i, n := range input {
eg.Go(func() (err error) {
output1 = append(output1, n+1)
output2[i] = n + 1
return nil
})
}
eg.Wait()
fmt.Printf("with append %+v", output1)
fmt.Println()
fmt.Printf("with make %+v", output2)
}

输出
with append [3 3 3]
with make [0 0 3]

与预期[1 2 3]

这里有两个独立的问题:


首先,循环中的变量在每个线程程序有机会读取它们之前发生了变化。当你有一个像

这样的循环
for i, n, := range input {
// ...
}

变量in在整个循环期间有效。当控制到达循环的底部并跳回到顶部时,这些变量被赋予新值。如果在循环中启动的程序使用了这些变量,那么它们的值将不可预测地改变。这就是为什么您会在示例的输出中多次看到相同的数字。在第一次循环迭代中启动的例程直到n已经设置为2才开始执行。

要解决这个问题,您可以按照NotX的答案所示的方法,创建新的变量,这些变量的作用域仅为循环的一次迭代:

for i, n := range input {
ic, nc := i, n
// use ic and nc instead of i and n
}

在循环中声明的变量的作用域仅为循环的一次迭代,因此当下一次循环开始时,将创建全新的变量,从而防止在启动程序和实际开始运行程序之间更改原始变量。


第二,你同时修改来自不同例程的相同值,这是不安全的。特别是,您正在使用append并发地追加到同一片。在这种情况下会发生什么是不确定的,各种不好的事情都可能发生。

有两种方法来处理这个问题。您已经设置的第一个:使用make预分配一个输出片,然后让每个例程填充片中的特定位置:

output := make([]int, 3)
for i, n := range input {
ic, nc := i, n
eg.Go(func() (err error) {
output[ic] = nc + 1
return nil
})
}
eg.Wait()

当你开始循环时,如果你知道你将有多少输出,这将非常有效。

另一种选择是使用某种锁来控制对输出片的访问。sync.Mutex非常适合这个:

var output []int
mu sync.Mutex
for _, n := range input {
nc := n
eg.Go(func() (err error) {
mu.Lock()
defer mu.Unlock()
output = append(output, nc+1)
return nil
})
}
eg.Wait()

如果你不知道你有多少个输出,它可以工作,但是它不能保证输出的顺序——它可以是任何顺序。如果你想把它按顺序排列,你可以在所有的例程结束后做一些排序。

在运行一些go例程时不能保证顺序。因此,虽然可以期望元素1,2,3,但您不应该对顺序进行任何假设。

无论如何,看起来第一个eg.Go()调用发生在forfor循环实际上到达它的第三个元素时。这就是为什么你只能得到3,并且索引访问只能在第3个位置(i=2)。

如果你像这样复制你的值,这个问题在某种程度上是固定的:

for i, n := range input {
nc, ic := n, i
eg.Go(func() (err error) {
output1 = append(output1, nc+1)
output2[ic] = nc + 1
return nil
})
}
也就是说,结果看起来像
with append [3 2 1]
with make [1 2 3]

对我来说,所以订单仍然不是我们可能期望的。不过,我不是errgroup包的专家,所以也许其他人可以分享更多关于执行顺序的信息。

最新更新