如何让多个 goroutines 读取单个文件的行?

  • 本文关键字:文件 单个 读取 goroutines go
  • 更新时间 :
  • 英文 :


我想读取一个巨大的文件,比如> 1 GB,并让多个 worker goroutines 处理其行。

我担心在使用大量工作线程 goroutines 时,使用单个 goroutine(main(来读取输入行会带来瓶颈。

如何安全地让多个 goroutines 读取文件的行?是否可以将输入文件拆分为多个 chuck,并让每个 goroutine 单独在单独的块上运行?

下面是一个 goroutine 读取输入行的示例代码,由多个 worker goroutine 处理它们:

package main
import (
"bufio"
"fmt"
"log"
"os"
)
func main() {
file, err := os.Open("/path/to/file.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
lines := make(chan string)
for i := 0; i < 100; i++ {
// start 100 workers to process input lines.
// the workers terminate once 'lines' is closed.
go worker(lines)
}
scanner := bufio.NewScanner(file)
go func() {
defer close(lines)
for scanner.Scan() {
lines <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}()
...
}

首先,并发读取文件是没有意义的。如果需要对行数据执行复杂的操作,应该做的是按顺序读取文件,并将行的内容发送到每个goroutine。

为了优化此过程,您应该更改不同的东西。您需要更改的第一件事是工人数量。此值不是随机设置的,要实现计算机的最大性能,请使用以下命令:

for i := 0; i < runtime.GOMAXPROCS(0); i++ {
go worker(lines)
}

这样,您将有效地使用计算机可用的CPU。 最后,要处理每行的所有数据,您必须添加:

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer close(lines)
for scanner.Scan() {
wg.Add(1)
lines <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
wg.Done()
}()
wg.Wait()

在工作线程函数中,您还将在末尾添加一个wg.Done()

希望对您有所帮助!