<string> 具有多线程/任务的并发队列?



我如何将多线程添加到并发队列中,目前我目前正在 1 个线程上使用 Concurrentqueue 处理文本文件,但如果我想在多个线程上运行它以减少整体处理时间怎么办?

当前方法示例 -

    private static ConcurrentQueue<string> queue;
    static void Main(string[] args)
    {
        queue = new ConcurrentQueue<string>(System.IO.File.ReadAllLines("input.txt"));
        Process();
    }
    static void Process()
    {
        while (queue.Count > 0)
        {
            string entry;
            if (queue.TryDequeue(out entry))
            {
                Console.WriteLine(entry);
                log("out.txt", entry);
            }
        }
    }
    private static void log(string file, string data)
    {
        using (StreamWriter writer = System.IO.File.AppendText(file))
        {
            writer.WriteLine(data);
            writer.Flush();
            writer.Close();
        }
    }

代码分解 -

queue = new ConcurrentQueue<string>(System..) // assigns queue to a text file
Process(); // Executes the Process method

static void Process() {
    while ... // runs a loop whilst queue.count is not equal to 0
    if (queueTryDequeue... // takes one line from queue and assigns it to 'string entry'
    Console.. // Writes 'entry' to console
    log.. // adds 'string entry' to a new line inside 'out.txt'

例如.txt input包含1000个条目,我想创建10个线程,这些线程从input中获取一个条目.txt并对其进行处理,同时避免使用相同的条目/复制与另一个线程相同的进程。我将如何实现这一目标?

你应该使用并行循环:

注意:它不会按原始顺序循环项目!

private static StreamWriter logger;
static void Main(string[] args)
{
    // Store your entries from a file in a queue.
    ConcurrentQueue<string> queue = new ConcurrentQueue<string>(System.IO.File.ReadAllLines("input.txt"));
    // Open StreamWriter here.
    logger = File.AppendText("log.txt");
    // Call process method.
    ProcessParallel(queue);
    // Close the StreamWriter after processing is done.
    logger.Close();
}
static void ProcessParallel(ConcurrentQueue<string> collection)
{
    ParallelOptions options = new ParallelOptions()
    {
        // A max of 10 threads can access the file at one time.
        MaxDegreeOfParallelism = 10
    };
    // Start the loop and store the result, so we can check if all the threads are done.
    // The Parallel.For will do all the mutlithreading for you!
    ParallelLoopResult result = Parallel.For(0, collection.Count, options, (i) =>
    {
        string entry;
        if (collection.TryDequeue(out entry))
        {
            Console.WriteLine(entry);
            log(entry);
        }
    });
    // Parallel.ForEach can also be used.
    // Block the main thread while it is still processing the entries...
    while (!result.IsCompleted) ;
    // Every thread is done
    Console.WriteLine("Multithreaded loop is done!");
}
private static void log(string data)
{
    if (logger.BaseStream == null)
    {
        // Cannot log, because logger.Close(); was called.
        return;
    }
    logger.WriteLine(data);
}

最新更新