并发文件系统扫描



我想为目录中的文件获得文件信息(字节中的文件名& size(。但是有很多子目录(〜1000(和文件(〜40 000(。

实际上我的解决方案是使用filepath.walk((获取每个文件的文件信息。但这很长。

func visit(path string, f os.FileInfo, err error) error {
    if f.Mode().IsRegular() {
        fmt.Printf("Visited: %s File name: %s Size: %d bytesn", path, f.Name(), f.Size())
    }
    return nil
}
func main() {
    flag.Parse()
    root := "C:/Users/HERNOUX-06523/go/src/boilerpipe" //flag.Arg(0)
    filepath.Walk(root, visit)
}

是否可以使用filepath.walk((?

进行并行/并发处理

您可以通过修改visit()函数以不进入子文件夹,而是为每个子文件夹启动新的Goroutine。

为了做到这一点,如果条目是目录,请返回visit()函数中的特殊filepath.SkipDir错误。不要忘记检查visit()中的path是否是子文件夹,应该处理Goroutine,因为这也将其传递给visit(),如果没有此检查,您将无休止地启动Goroutines。

您还需要某种"计数器",即在后台仍在工作多少goroutines,因为您可以使用sync.WaitGroup

这是一个简单的实现:

var wg sync.WaitGroup
func walkDir(dir string) {
    defer wg.Done()
    visit := func(path string, f os.FileInfo, err error) error {
        if f.IsDir() && path != dir {
            wg.Add(1)
            go walkDir(path)
            return filepath.SkipDir
        }
        if f.Mode().IsRegular() {
            fmt.Printf("Visited: %s File name: %s Size: %d bytesn",
                path, f.Name(), f.Size())
        }
        return nil
    }
    filepath.Walk(dir, visit)
}
func main() {
    flag.Parse()
    root := "folder/to/walk" //flag.Arg(0)
    wg.Add(1)
    walkDir(root)
    wg.Wait()
}

一些注释:

取决于子文件夹中文件的"分布",这可能无法完全利用您的CPU/存储,就好像为一个子文件夹中的所有文件中的99%一样,Goroutine仍然需要大部分时间。<<<<<<<<<<<<<<<<<<<<

还请注意,fmt.Printf()调用已序列化,因此这也会减慢过程。我认为这只是一个例子,实际上您将在内存中进行某种处理/统计信息。不要忘记也可以保护从visit()函数访问的变量并发访问。

不必担心大量的子文件夹。这是正常的,并且运行时能够处理数十万个goroutines。

还要注意,性能瓶颈很可能是您的存储/硬盘速度,因此您可能无法获得所需的性能。一定的一点(硬盘限制(之后,您将无法提高性能。

还为每个子文件夹推出一个新的Goroutine可能不是最佳的,可能是通过限制goroutines步行文件夹的数量来获得更好的性能。为此,请查看并使用工人池:

这是GO中的惯用工人线程池吗?

我解决此问题的版本

func ConcurrentDirWalker(wg *sync.WaitGroup, dir string, paths chan<- string) {
    walker := func(path string, info os.FileInfo, err error) error {
        if info.IsDir() && path != dir {
            wg.Add(1)
            go func() {
                defer wg.Done()
                ConcurrentDirWalker(wg, path, paths)
            }()
            return filepath.SkipDir
        }
        if info.Mode().IsRegular(){
            paths <- path
        }
        return nil
    }
    filepath.Walk(dir, walker)
}
collectorCh := make(chan string, 1000)
files := make([]string, 0)
var wgCollector sync.WaitGroup
wgCollector.Add(1)
go func() {
    defer wgCollector.Done()
    for path := range collectorCh {
        files = append(files, path)
    }
}()
root := "/users/me/dev/"
var wgWorkers sync.WaitGroup
wgWorkers.Add(1)
go func() {
    defer wgWorkers.Done()
    ConcurrentDirWalker(&wgWorkers, root, collectorCh)
}()
wgWorkers.Wait()
close(collectorCh)
wgCollector.Wait()

相关内容

最新更新