通过错误组处理 goroutine 终止和错误处理



我正在尝试以这样的方式并行读取多个文件,以便每个正在读取文件的go例程将其数据写入该通道,然后有一个go例程侦听该通道并将数据添加到映射中。这是我的剧本。

下面是剧中的例子:

package main
import (
"fmt"
"sync"
)
func main() {
var myFiles = []string{"file1", "file2", "file3"}
var myMap = make(map[string][]byte)
dataChan := make(chan fileData, len(myFiles))
wg := sync.WaitGroup{}
defer close(dataChan)
// we create a wait group of N
wg.Add(len(myFiles))
for _, file := range myFiles {
// we create N go-routines, one per file, each one will return a struct containing their filename and bytes from
// the file via the dataChan channel
go getBytesFromFile(file, dataChan, &wg)
}
// we wait until the wait group is decremented to zero by each instance of getBytesFromFile() calling waitGroup.Done()
wg.Wait()
for i := 0; i < len(myFiles); i++ {
// we can now read from the data channel N times.
file := <-dataChan
myMap[file.name] = file.bytes
}
fmt.Printf("%+vn", myMap)
}
type fileData struct {
name  string
bytes []byte
}
// how to handle error from this method if reading file got messed up?
func getBytesFromFile(file string, dataChan chan fileData, waitGroup *sync.WaitGroup) {
bytes := openFileAndGetBytes(file)
dataChan <- fileData{name: file, bytes: bytes}
waitGroup.Done()
}
func openFileAndGetBytes(file string) []byte {
return []byte(fmt.Sprintf("these are some bytes for file %s", file))
}

问题陈述

如何使用 golang.org/x/sync/errgroup 来等待和处理来自goroutines的错误,或者是否有更好的方法,例如使用信号量?例如,如果我的任何一个 go 例程无法从文件中读取数据,那么我想在任何一个例程返回错误的情况下取消所有剩余的数据(在这种情况下,该错误是返回调用方的一个气泡)。它应该自动等待所有提供的 go 例程成功完成,以获得成功案例。

如果文件总数为 100,我也不想生成 100 个 go-routine。如果有办法,我想尽可能控制并行性。

如何使用 golang.org/x/sync/errgroup 来等待和处理来自goroutines的错误,或者是否有更好的方法,例如使用信号量?例如 [...]我想在任何一个例程返回错误的情况下取消所有剩余的那些(在这种情况下,该错误是返回调用方的一个气泡)。它应该自动等待所有提供的 go 例程成功完成,以获得成功案例。

有许多方法可以在 goroutines 之间传达错误状态。 不过,errgroup做了很多繁重的工作,并且适合这种情况。否则,您最终将实现同样的事情。

要使用errgroup我们需要处理错误(对于您的演示,生成一些错误)。 此外,要取消现有的 goroutine,我们将使用errgroup.NewWithContext中的上下文。

从错误组引用,

包 errgroup 为处理常见任务的子任务的 goroutines 组提供同步、错误传播和上下文取消。

您的游戏不支持任何错误处理。 如果我们不进行任何错误处理,则无法收集和取消错误。 所以我添加了一些代码来注入错误处理:

func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}

然后,该错误也必须从getBytesFromFile传回:

func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}

现在我们已经完成了,我们可以把注意力转向如何启动一些goroutines。

如果文件总数为 100,我也不想生成 100 个 go-routine。如果有办法,我想尽可能控制并行性。

如果写得好,任务数、通道大小和辅助角色数通常是独立的值。 诀窍是使用通道闭包 - 在您的情况下,上下文取消 - 在goroutines之间传达状态。 我们需要一个额外的通道来分发文件名,以及一个额外的goroutine来收集结果。

为了说明这一点,我的代码使用了 3 个工作线程,并添加了更多文件。 我的频道未缓冲。 这使我们能够看到一些文件被处理,而其他文件被中止。如果缓冲通道,该示例仍将有效,但更有可能在处理取消之前处理其他工作。 试验缓冲区大小以及工作线程计数和要处理的文件数。

var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)

要启动工作线程,我们不是为每个文件启动一个,而是从我们想要的数字开始 - 这里,3。

for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}

工作线程调用您的getBytesFromFile函数。 如果它返回一个错误,我们返回一个错误。 在这种情况下,errgroup将自动取消我们的上下文。 但是,操作的确切顺序不是确定的,因此在取消上下文之前可能会也可能不会处理更多文件。 我将在下面展示几种可能性。

通过rangefileChan,工人自动从通道关闭中拾取工作结束。 如果我们收到错误,我们可以立即将其返回给errgroup。 否则,如果上下文已被取消,我们可以立即返回取消错误。

您可能认为g.Go会自动取消我们的函数。 但事实并非如此。 除了进程终止之外,无法取消正在运行的函数Goerrgroup.Group.Go的函数参数必须在适当的时候根据其上下文的状态自行取消。

现在我们可以将注意力转向将文件放在fileChan上的东西。 我们这里有 2 个选项:我们可以使用大小为myFiles的缓冲通道 ,就像你一样。 我们可以用待处理的作业填充整个频道。 仅当您在创建渠道时知道作业数时,此选项才是一个选项。 另一种选择是使用一个额外的"分发"goroutine,它可以阻止对fileChan的写入,以便我们的"主"goroutine可以继续。

// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})

我不确定在这种情况下是否绝对有必要将其放在同一个错误组中,因为我们无法在分发器 goroutine 中出现错误。 但是,无论工作调度程序是否可能生成错误,这种常规模式(从 errgroup 的管道示例中绘制)都有效。

此功能非常简单,但魔力与ctx.Done()通道一起select。 我们要么写到工作频道,要么如果我们的上下文完成,我们就失败了。 这允许我们在工作线程失败一个文件时停止分发工作。

我们defer close(fileChan)这样,无论我们为什么完成(要么我们分发了所有工作,要么上下文被取消),工作人员都知道传入的工作队列(即fileChan)上将不再有工作。

我们还需要一个同步机制:一旦所有工作都分发完毕,并且所有结果都在或工作完成被取消,(例如,在我们的 errgroup 的Wait()返回之后),我们需要关闭我们的结果通道,dataChan。 这会向结果收集器发出信号,表明没有更多要收集的结果。

var err error // we'll need this later!
go func() {
err = g.Wait()
close(dataChan)
}()

我们不能 - 也不需要 - 把它放在errgroup.Group. 该函数无法返回错误,并且无法等待自身close(dataChan)。 所以它进入了一个常规的老套路,没有错误组。

最后,我们可以收集结果。 通过专用的 worker goroutines、一个分发器 goroutine和一个等待工作的 goroutine,并通知不会再有写入dataChan,我们可以在main的"主"goroutine中收集所有结果。

for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil { // this was set in our final goroutine, remember
fmt.Println("errgroup Error:", err.Error())
}

我做了一些小的更改,以便更容易看到输出。 您可能已经注意到我将文件内容从[]byte更改为string。 这纯粹是为了便于阅读结果。 为此,我正在使用encoding/json来格式化结果,以便于阅读它们并将其粘贴到 SO 中。 这是我经常用来缩进结构化数据的常见模式:

enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}

终于,我们准备好运行了。 现在我们可以看到许多不同的结果,具体取决于 goroutines 的执行顺序。 但它们都是有效的执行路径。

worker 2 failed to process file2 : file2 cannot be read
worker 0 context error in worker: context canceled
worker 1 context error in worker: context canceled
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3"
}
Program exited.

在此结果中,剩余的工作(file4file5)未添加到通道中。 请记住,未缓冲的通道不存储任何数据。 对于要将这些任务写入通道,必须有工作人员在那里读取它们。 相反,上下文在file2失败后被取消,并且分布函数遵循其选择中的<-done路径。file1file3已经处理完毕。

这是一个不同的结果(我只是跑了几次游乐场分享以获得不同的结果)。

worker 1 failed to process file2 : file2 cannot be read
worker 2 processed all work on channel
worker 0 processed all work on channel
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3",
"file4": "these are some bytes for file file4",
"file5": "these are some bytes for file file5",
"file6": "these are some bytes for file file6"
}

在这种情况下,看起来我们的取消有点失败。 但真正发生的事情是,goroutines只是"碰巧"排队并完成其余的工作,然后errorgroup挑出工人的失败并取消上下文。

错误组的作用

当您使用错误组时,您实际上可以从中得到两件事:

  • 轻松访问您的工人返回的第一个错误;
  • 获取错误组将在以下情况下为您取消的上下文

请记住,错误组不会取消 goroutine。 起初这让我有点绊倒。 错误组取消上下文。 您有责任将该上下文的状态应用于您的 goroutines(请记住,goroutine必须自行结束,errorgroup不能结束它)。

关于文件操作上下文和失败的未完成工作的最后一句旁白

大多数文件操作,例如io.Copyos.ReadFile,实际上是后续Read操作的循环。 但ioos并不直接支持上下文。因此,如果您有一个工作线程读取文件,并且您自己没有实现Read循环,您将没有机会根据上下文取消。 在您的情况下,这可能没问题 - 当然,您可能已经读取了比实际需要更多的文件,但这只是因为发生错误时您已经在读取它们。 我个人会接受这种情况,而不是实现我自己的读取循环。

代码

https://go.dev/play/p/9qfESp_eB-C

package main
import (
"context"
"encoding/json"
"fmt"
"os"
"golang.org/x/sync/errgroup"
)
func main() {
var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)
g, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}
// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
if err := ctx.Err(); err != nil {
return err
}
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})
var err error
go func() {
err = g.Wait()
close(dataChan)
}()
var myMap = make(map[string]string)
for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil {
fmt.Println("errgroup Error:", err.Error())
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}
}
type fileData struct {
name,
bytes string
}
func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}
func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}

最新更新