如何优化计算数千个文件的哈希



i有一个.NET程序,该程序通过包含数万个相对较小的文件(每个约10MB)的目录运行,计算其MD5 Hash并将数据存储在SQLite数据库中。整个过程效果很好,但是需要相对较长的时间(大约有6000个文件的1094353ms),我正在寻找优化它的方法。这是我想到的解决方案:

  1. 使用其他线程,并同时计算多个文件的哈希。不确定我/o速度如何限制我。

  2. 使用更好的哈希算法。我环顾四周,我目前正在使用的似乎是最快的(至少在C#上)。

哪种方法是最好的方法,还有更好的方法吗?

这是我当前的代码:

private async Task<string> CalculateHash(string file, System.Security.Cryptography.MD5 md5) {
    Task<string> MD5 = Task.Run(() =>
    {
        {
            using (var stream = new BufferedStream(System.IO.File.OpenRead(file), 1200000))
                {
                    var hash = md5.ComputeHash(stream);
                    var fileMD5 = string.Concat(Array.ConvertAll(hash, x => x.ToString("X2")));
                    return fileMD5;
                }
            };
        });
        return await MD5;
    }
public async Main() {
    using (var md5 = System.Security.Cryptography.MD5.Create()) {
         foreach (var file in Directory.GetFiles(path)) {
            var hash = await CalculateHash(file, md5);
            // Adds `hash` to the database
        }
    }
}

创建一个工作管道,这是我知道如何创建使用代码的两个部分的管道的最简单方法TPL DataFlow

public static class Example
{ 
    private class Dto
    {
        public Dto(string filePath, byte[] data)
        {
            FilePath = filePath;
            Data = data;
        }
        public string FilePath { get; }
        public byte[] Data { get; }
    }
    public static async Task ProcessFiles(string path)
    {
        var getFilesBlock = new TransformBlock<string, Dto>(filePath => new Dto(filePath, File.ReadAllBytes(filePath))); //Only lets one thread do this at a time.
        var hashFilesBlock = new TransformBlock<Dto, Dto>(dto => HashFile(dto), 
                new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = Environment.ProcessorCount, //We can multi-thread this part.
                                                  BoundedCapacity = 50}); //Only allow 50 byte[]'s to be waiting in the queue. It will unblock getFilesBlock once there is room.
        var writeToDatabaseBlock = new ActionBlock<Dto>(WriteToDatabase,
              new ExecutionDataflowBlockOptions {BoundedCapacity = 50});//MaxDegreeOfParallelism defaults to 1 so we don't need to specifiy it.
        //Link the blocks together.
        getFilesBlock.LinkTo(hashFilesBlock, new DataflowLinkOptions {PropagateCompletion = true});
        hashFilesBlock.LinkTo(writeToDatabaseBlock, new DataflowLinkOptions {PropagateCompletion = true});
        //Queue the work for the first block.
        foreach (var filePath in Directory.EnumerateFiles(path))
        {
            await getFilesBlock.SendAsync(filePath).ConfigureAwait(false);
        }
        //Tell the first block we are done adding files.
        getFilesBlock.Complete();
        //Wait for the last block to finish processing its last item.
        await writeToDatabaseBlock.Completion.ConfigureAwait(false);
    }
    private static Dto HashFile(Dto dto)
    {
        using (var md5 = System.Security.Cryptography.MD5.Create())
        {
            return new Dto(dto.FilePath, md5.ComputeHash(dto.Data));
        }
    }
    private static async Task WriteToDatabase(Dto arg)
    {
        //Write to the database here.
    }
}

这将创建一个带有3个段的管道。

单线螺纹的一个,将文件从硬盘驱动器中读取到内存并存储为byte[]

第二个可以用来 Enviorement.ProcessorCount线程来哈希文件的第二个,它只能允许50个项目坐在其入口队列上,当第一个块尝试添加时,它将停止处理新项目,直到下一个块为止准备接受新物品。

和第三个是单线螺纹并将数据添加到数据库中的第三个,它一次仅允许入站队列中的50个项目。

由于有两个50个限制,内存中最多将有100个 byte[]hashFilesBlock队列中的50个限制,在writeToDatabaseBlock队列中50个,当前正在处理的项目计算到BoundedCapacity限制。


update :为了娱乐,我也写了一个报告进度的版本,但它未经测试,并且使用了C#7功能。

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public static class Example
{
    private class Dto
    {
        public Dto(string filePath, byte[] data)
        {
            FilePath = filePath;
            Data = data;
        }
        public string FilePath { get; }
        public byte[] Data { get; }
    }
    public static async Task ProcessFiles(string path, IProgress<ProgressReport> progress)
    {
        int totalFilesFound = 0;
        int totalFilesRead = 0;
        int totalFilesHashed = 0;
        int totalFilesUploaded = 0;
        DateTime lastReported = DateTime.UtcNow;
        void ReportProgress()
        {
            if (DateTime.UtcNow - lastReported < TimeSpan.FromSeconds(1)) //Try to fire only once a second, but this code is not perfect so you may get a few rapid fire.
            {
                return;
            }
            lastReported = DateTime.UtcNow;
            var report = new ProgressReport(totalFilesFound, totalFilesRead, totalFilesHashed, totalFilesUploaded);
            progress.Report(report);
        }

        var getFilesBlock = new TransformBlock<string, Dto>(filePath =>
        {
            var dto = new Dto(filePath, File.ReadAllBytes(filePath));
            totalFilesRead++; //safe because single threaded.
            return dto;
        });
        var hashFilesBlock = new TransformBlock<Dto, Dto>(inDto =>
            {
                using (var md5 = System.Security.Cryptography.MD5.Create())
                {
                    var outDto = new Dto(inDto.FilePath, md5.ComputeHash(inDto.Data));
                    Interlocked.Increment(ref totalFilesHashed); //Need the interlocked due to multithreaded.
                    ReportProgress();
                    return outDto;
                }
            },
            new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = Environment.ProcessorCount, BoundedCapacity = 50});
        var writeToDatabaseBlock = new ActionBlock<Dto>(arg =>
            {
                //Write to database here.
                totalFilesUploaded++;
                ReportProgress();
            },
            new ExecutionDataflowBlockOptions {BoundedCapacity = 50});
        getFilesBlock.LinkTo(hashFilesBlock, new DataflowLinkOptions {PropagateCompletion = true});
        hashFilesBlock.LinkTo(writeToDatabaseBlock, new DataflowLinkOptions {PropagateCompletion = true});
        foreach (var filePath in Directory.EnumerateFiles(path))
        {
            await getFilesBlock.SendAsync(filePath).ConfigureAwait(false);
            totalFilesFound++;
            ReportProgress();
        }
        getFilesBlock.Complete();
        await writeToDatabaseBlock.Completion.ConfigureAwait(false);
        ReportProgress();
    }
}
public class ProgressReport
{
    public ProgressReport(int totalFilesFound, int totalFilesRead, int totalFilesHashed, int totalFilesUploaded)
    {
        TotalFilesFound = totalFilesFound;
        TotalFilesRead = totalFilesRead;
        TotalFilesHashed = totalFilesHashed;
        TotalFilesUploaded = totalFilesUploaded;
    }
    public int TotalFilesFound { get; }
    public int TotalFilesRead{ get; }
    public int TotalFilesHashed{ get; }
    public int TotalFilesUploaded{ get; }
}
据我了解,

task.run将为您拥有的每个文件实例化一个新线程,这会导致许多线程和上下文之间的切换。就像您描述的那样,听起来像是使用并行使用的好情况。

public void CalcHashes(string path)
{
    string GetFileHash(System.Security.Cryptography.MD5 md5, string fileName)
    {
        using (var stream = new BufferedStream(System.IO.File.OpenRead(fileName), 1200000))
        {
            var hash = md5.ComputeHash(stream);
            var fileMD5 = string.Concat(Array.ConvertAll(hash, x => x.ToString("X2")));
            return fileMD5;
        }
    }
    ParallelOptions options = new ParallelOptions();
    options.MaxDegreeOfParallelism = 8;
    Parallel.ForEach(filenames, options, fileName =>
    {
        using (var md5 = System.Security.Cryptography.MD5.Create())
        {
            GetFileHash(md5, fileName);
        }
    });
}

编辑:似乎并行。foreach实际上并未自动进行分区。将最大平行性限制添加到8。结果:107005文件46628 MS

最新更新