多个线程编写为队列,一个线程写入C#中的队列文件



在控制台应用程序中,我正在通过同一文件夹读取数十万个文件(每个文件(。〜150 GB(。循环的每个任务都会生成一个我需要写入磁盘的对象。为了避免生成数千个输出文件,我正在寻找一种将所有结果写入同一文件的方法。

锁定线程不是一个选项,因为我需要极端性能,并且将所有内容保持在记忆中是不可行的。

我不熟悉多任务处理,除了 parallel ... loops,我找不到任何相关的内容。

我如何创建一个排队系统(输出顺序不重要(, parallel.foreach loop将其输出写入到以队列的对象并将其附加到磁盘上的一个大文件。

这大约是我到目前为止所拥有的

static void ParseData(string directory, MyWriter writer)
{
    string[] files  = Directory.GetFiles(repository);
    Parallel.ForEach(file, files =>
    {
        object obj = GenerateObject(file);
        writer.AddToQueue(obj);
    }
    writer.NothingMoreToAdd = true;
}
class MyWriter : TextWriter
{
    private ConcurrentQueue<object> _queue;
    public bool NothingMoreToAdd {get; set;}
    public MyWriter()
        : base()
    {
         this.NothingMoreToAdd = false;
    }
    public AddToQueue(object obj)
    {
        this._queue.Enqueue(obj);
    }
    // Function to set as asynchronous and to last until this.NothingMoreToAdd is set to true
    public WriteToFile(string file)
    {
        using (StreamWriter writer = new StreamWriter(file))
            while (!this.NothingMoreToAdd) // until queue is not set to end ...
            {
                if (this._queue.Count > 0)
                {
                    object obj;
                    if (this._queue.TryDequeue(out obj))
                        writer.Write(obj);
                }
                // maybe Thread.Sleep(20);
            }
    }
}

将要实现mywriter对象,是该应用程序的主要内容及其在另一个线程中调用的writetofile方法。然后,可以在主线程中启动parsedata方法,并在将moretoadd设置为true时结束Writetofile方法。

如果您对如何管理和写入磁盘的排队有任何建议。

这是快速而肮脏的东西。当然,您可以更改它以适合您的需求。但基本上,它完成了您要求的内容:parallelly读取多个文件并与它们做点事,并将结果添加到队列中并将结果持续到文件

  private static void Main(string[] args)
        {
            string outputFile = @"E:outputfile.txt";
            ConcurrentQueue<object> queue = new ConcurrentQueue<object>();
            string[] files = Directory.GetFiles(@"E:100D3100", "*.*", SearchOption.TopDirectoryOnly);
            bool isCompleted = false;
            Task t1 = new Task(() =>
            {
                Parallel.ForEach(files, file =>
                    {
                        queue.Enqueue(file);
                    });
                isCompleted = true;
            });
            t1.Start();
            Task t2 = new Task(() =>
               {
                   object file = new object();
                   while (isCompleted != true)
                   {
                       queue.TryDequeue(out file);
                       if (file != null)
                       {
                           string[] text = File.ReadAllLines(file.ToString());
                           File.AppendAllLines(outputFile, text);
                       }
                   }
                   foreach (var item in queue)
                   {
                       string[] text = File.ReadAllLines(file.ToString());
                       File.AppendAllLines(outputFile, text);
                   }
               });
            t2.Start();
            Task.WhenAll(t1, t2).Wait();
        }

旁注:并行I/O非常依赖硬件,尤其是在磁盘上完成时。

最新更新