我正在尝试以这样的方式并行读取多个文件,以便每个正在读取文件的go例程将其数据写入该通道,然后有一个go例程侦听该通道并将数据添加到映射中。这是我的剧本。
下面是剧中的例子:
package main
import (
"fmt"
"sync"
)
func main() {
var myFiles = []string{"file1", "file2", "file3"}
var myMap = make(map[string][]byte)
dataChan := make(chan fileData, len(myFiles))
wg := sync.WaitGroup{}
defer close(dataChan)
// we create a wait group of N
wg.Add(len(myFiles))
for _, file := range myFiles {
// we create N go-routines, one per file, each one will return a struct containing their filename and bytes from
// the file via the dataChan channel
go getBytesFromFile(file, dataChan, &wg)
}
// we wait until the wait group is decremented to zero by each instance of getBytesFromFile() calling waitGroup.Done()
wg.Wait()
for i := 0; i < len(myFiles); i++ {
// we can now read from the data channel N times.
file := <-dataChan
myMap[file.name] = file.bytes
}
fmt.Printf("%+vn", myMap)
}
type fileData struct {
name string
bytes []byte
}
// how to handle error from this method if reading file got messed up?
func getBytesFromFile(file string, dataChan chan fileData, waitGroup *sync.WaitGroup) {
bytes := openFileAndGetBytes(file)
dataChan <- fileData{name: file, bytes: bytes}
waitGroup.Done()
}
func openFileAndGetBytes(file string) []byte {
return []byte(fmt.Sprintf("these are some bytes for file %s", file))
}
问题陈述
如何使用 golang.org/x/sync/errgroup 来等待和处理来自goroutines的错误,或者是否有更好的方法,例如使用信号量?例如,如果我的任何一个 go 例程无法从文件中读取数据,那么我想在任何一个例程返回错误的情况下取消所有剩余的数据(在这种情况下,该错误是返回调用方的一个气泡)。它应该自动等待所有提供的 go 例程成功完成,以获得成功案例。
如果文件总数为 100,我也不想生成 100 个 go-routine。如果有办法,我想尽可能控制并行性。
如何使用 golang.org/x/sync/errgroup 来等待和处理来自goroutines的错误,或者是否有更好的方法,例如使用信号量?例如 [...]我想在任何一个例程返回错误的情况下取消所有剩余的那些(在这种情况下,该错误是返回调用方的一个气泡)。它应该自动等待所有提供的 go 例程成功完成,以获得成功案例。
有许多方法可以在 goroutines 之间传达错误状态。 不过,errgroup
做了很多繁重的工作,并且适合这种情况。否则,您最终将实现同样的事情。
要使用errgroup
我们需要处理错误(对于您的演示,生成一些错误)。 此外,要取消现有的 goroutine,我们将使用errgroup.NewWithContext
中的上下文。
从错误组引用,
包 errgroup 为处理常见任务的子任务的 goroutines 组提供同步、错误传播和上下文取消。
您的游戏不支持任何错误处理。 如果我们不进行任何错误处理,则无法收集和取消错误。 所以我添加了一些代码来注入错误处理:
func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}
然后,该错误也必须从getBytesFromFile
传回:
func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}
现在我们已经完成了,我们可以把注意力转向如何启动一些goroutines。
如果文件总数为 100,我也不想生成 100 个 go-routine。如果有办法,我想尽可能控制并行性。
如果写得好,任务数、通道大小和辅助角色数通常是独立的值。 诀窍是使用通道闭包 - 在您的情况下,上下文取消 - 在goroutines之间传达状态。 我们需要一个额外的通道来分发文件名,以及一个额外的goroutine来收集结果。
为了说明这一点,我的代码使用了 3 个工作线程,并添加了更多文件。 我的频道未缓冲。 这使我们能够看到一些文件被处理,而其他文件被中止。如果缓冲通道,该示例仍将有效,但更有可能在处理取消之前处理其他工作。 试验缓冲区大小以及工作线程计数和要处理的文件数。
var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)
要启动工作线程,我们不是为每个文件启动一个,而是从我们想要的数字开始 - 这里,3。
for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}
工作线程调用您的getBytesFromFile
函数。 如果它返回一个错误,我们返回一个错误。 在这种情况下,errgroup
将自动取消我们的上下文。 但是,操作的确切顺序不是确定的,因此在取消上下文之前可能会也可能不会处理更多文件。 我将在下面展示几种可能性。
通过range
fileChan
,工人自动从通道关闭中拾取工作结束。 如果我们收到错误,我们可以立即将其返回给errgroup
。 否则,如果上下文已被取消,我们可以立即返回取消错误。
您可能认为g.Go
会自动取消我们的函数。 但事实并非如此。 除了进程终止之外,无法取消正在运行的函数Go
。errgroup.Group.Go
的函数参数必须在适当的时候根据其上下文的状态自行取消。
现在我们可以将注意力转向将文件放在fileChan
上的东西。 我们这里有 2 个选项:我们可以使用大小为myFiles
的缓冲通道 ,就像你一样。 我们可以用待处理的作业填充整个频道。 仅当您在创建渠道时知道作业数时,此选项才是一个选项。 另一种选择是使用一个额外的"分发"goroutine,它可以阻止对fileChan
的写入,以便我们的"主"goroutine可以继续。
// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})
我不确定在这种情况下是否绝对有必要将其放在同一个错误组中,因为我们无法在分发器 goroutine 中出现错误。 但是,无论工作调度程序是否可能生成错误,这种常规模式(从 errgroup 的管道示例中绘制)都有效。
此功能非常简单,但魔力与ctx.Done()
通道一起select
。 我们要么写到工作频道,要么如果我们的上下文完成,我们就失败了。 这允许我们在工作线程失败一个文件时停止分发工作。
我们defer close(fileChan)
这样,无论我们为什么完成(要么我们分发了所有工作,要么上下文被取消),工作人员都知道传入的工作队列(即fileChan
)上将不再有工作。
我们还需要一个同步机制:一旦所有工作都分发完毕,并且所有结果都在或工作完成被取消,(例如,在我们的 errgroup 的Wait()
返回之后),我们需要关闭我们的结果通道,dataChan
。 这会向结果收集器发出信号,表明没有更多要收集的结果。
var err error // we'll need this later!
go func() {
err = g.Wait()
close(dataChan)
}()
我们不能 - 也不需要 - 把它放在errgroup.Group
. 该函数无法返回错误,并且无法等待自身close(dataChan)
。 所以它进入了一个常规的老套路,没有错误组。
最后,我们可以收集结果。 通过专用的 worker goroutines、一个分发器 goroutine和一个等待工作的 goroutine,并通知不会再有写入dataChan
,我们可以在main
的"主"goroutine中收集所有结果。
for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil { // this was set in our final goroutine, remember
fmt.Println("errgroup Error:", err.Error())
}
我做了一些小的更改,以便更容易看到输出。 您可能已经注意到我将文件内容从[]byte
更改为string
。 这纯粹是为了便于阅读结果。 为此,我正在使用encoding/json
来格式化结果,以便于阅读它们并将其粘贴到 SO 中。 这是我经常用来缩进结构化数据的常见模式:
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}
终于,我们准备好运行了。 现在我们可以看到许多不同的结果,具体取决于 goroutines 的执行顺序。 但它们都是有效的执行路径。
worker 2 failed to process file2 : file2 cannot be read
worker 0 context error in worker: context canceled
worker 1 context error in worker: context canceled
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3"
}
Program exited.
在此结果中,剩余的工作(file4
和file5
)未添加到通道中。 请记住,未缓冲的通道不存储任何数据。 对于要将这些任务写入通道,必须有工作人员在那里读取它们。 相反,上下文在file2
失败后被取消,并且分布函数遵循其选择中的<-done
路径。file1
和file3
已经处理完毕。
这是一个不同的结果(我只是跑了几次游乐场分享以获得不同的结果)。
worker 1 failed to process file2 : file2 cannot be read
worker 2 processed all work on channel
worker 0 processed all work on channel
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3",
"file4": "these are some bytes for file file4",
"file5": "these are some bytes for file file5",
"file6": "these are some bytes for file file6"
}
在这种情况下,看起来我们的取消有点失败。 但真正发生的事情是,goroutines只是"碰巧"排队并完成其余的工作,然后errorgroup
挑出工人的失败并取消上下文。
错误组的作用
当您使用错误组时,您实际上可以从中得到两件事:
- 轻松访问您的工人返回的第一个错误;
- 获取错误组将在以下情况下为您取消的上下文
请记住,错误组不会取消 goroutine。 起初这让我有点绊倒。 错误组取消上下文。 您有责任将该上下文的状态应用于您的 goroutines(请记住,goroutine必须自行结束,errorgroup
不能结束它)。
关于文件操作上下文和失败的未完成工作的最后一句旁白
大多数文件操作,例如io.Copy
或os.ReadFile
,实际上是后续Read
操作的循环。 但io
和os
并不直接支持上下文。因此,如果您有一个工作线程读取文件,并且您自己没有实现Read
循环,您将没有机会根据上下文取消。 在您的情况下,这可能没问题 - 当然,您可能已经读取了比实际需要更多的文件,但这只是因为发生错误时您已经在读取它们。 我个人会接受这种情况,而不是实现我自己的读取循环。
代码
https://go.dev/play/p/9qfESp_eB-C
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"golang.org/x/sync/errgroup"
)
func main() {
var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)
g, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}
// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
if err := ctx.Err(); err != nil {
return err
}
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})
var err error
go func() {
err = g.Wait()
close(dataChan)
}()
var myMap = make(map[string]string)
for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil {
fmt.Println("errgroup Error:", err.Error())
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}
}
type fileData struct {
name,
bytes string
}
func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}
func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}