如何在TPL数据流中执行异步操作以获得最佳性能



我编写了下面的方法来批处理一个巨大的CSV文件。其思想是从文件中读取行块到内存中,然后将这些行块划分为固定大小的批。一旦我们得到分区,将这些分区发送到服务器(同步或异步),这可能需要一段时间。

private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
    List<string> chunk = new List<string>(chunkSize);
    foreach (var line in File.ReadLines(filePath))
    {
        if (chunk.Count == chunk.Capacity)
        {
            // Partition each chunk into smaller chunks grouped on column 1
            var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);
            // Further breakdown the chunks into batch size groups
            var groups = partitions.Select(x => x.Select((i, index) =>
                new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
            // Get batches from groups
            var batches = groups.SelectMany(x => x)
                .Select(y => y.Select(z => z)).ToList();
            // Process all batches asynchronously
            batches.AsParallel().ForAll(async b =>
            {
                WebClient client = new WebClient();
                byte[] bytes = System.Text.Encoding.ASCII
                    .GetBytes(b.SelectMany(x => x).ToString());
                await client.UploadDataTaskAsync("myserver.com", bytes);
            });
            // clear the chunk
            chunk.Clear();
        }
        chunk.Add(line);
    }
}

这段代码似乎不是很有效,原因有两个:

  1. 读取CSV文件的主线程被阻塞,直到所有分区都被处理完。

  2. AsParallel阻塞,直到所有任务完成。因此,如果线程池中有更多可用的线程来完成工作,我就不会使用它们,因为没有任务受到没有分区的约束。

batchSize是固定的,所以不能改变,但chunkSize是可调的性能。我可以选择一个足够大的chunkSize,这样,没有批创建>>没有线程可用在系统中,但它仍然意味着并行。ForEach方法阻塞,直到所有任务完成。

我如何修改代码,使系统中所有可用的线程都被用来完成工作,而不是闲置。我想我可以使用一个BlockingCollection来存储批次,但不确定给它什么容量大小,因为每个块中没有批次是动态的。

关于如何使用TPL来最大化线程利用率,以便系统上的大多数可用线程总是在做事情,有什么想法吗?

更新:这是我到目前为止使用TPL数据流得到的。这是正确的吗?

private static void UploadData(string filePath, int chunkSize, int batchSize)
{
    var buffer1 = new BatchBlock<string>(chunkSize);
    var buffer2 = new BufferBlock<IEnumerable<string>>();
    var action1 = new ActionBlock<string[]>(t =>
    {
        Console.WriteLine("Got a chunk of lines " + t.Count());
        // Partition each chunk into smaller chunks grouped on column 1
        var partitions = t.GroupBy(c => c.Split(',')[0], (key, g) => g);
        // Further breakdown the chunks into batch size groups
        var groups = partitions.Select(x => x.Select((i, index) =>
            new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
        // Get batches from groups
        var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
        foreach (var batch in batches)
        {
            buffer2.Post(batch);
        }
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    buffer1.LinkTo(action1, new DataflowLinkOptions
        { PropagateCompletion = true });
    var action2 = new TransformBlock<IEnumerable<string>,
        IEnumerable<string>>(async b =>
    {
        await ExecuteBatch(b);
        return b;
    }, new ExecutionDataflowBlockOptions
        { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    buffer2.LinkTo(action2, new DataflowLinkOptions
        { PropagateCompletion = true });
    var action3 = new ActionBlock<IEnumerable<string>>(b =>
    {
        Console.WriteLine("Finised executing a batch");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
    action2.LinkTo(action3, new DataflowLinkOptions
        { PropagateCompletion = true });
    Task produceTask = Task.Factory.StartNew(() =>
    {
        foreach (var line in File.ReadLines(filePath))
        {
            buffer1.Post(line);
        }
        //Once marked complete your entire data flow will signal a stop for
        // all new items
        Console.WriteLine("Finished producing");
        buffer1.Complete();
    });
    Task.WaitAll(produceTask);
    Console.WriteLine("Produced complete");
    action1.Completion.Wait();//Will add all the items to buffer2
    Console.WriteLine("Action1 complete");
    buffer2.Complete();//will not get any new items
    action2.Completion.Wait();//Process the batch of 5 and then complete
    Task.Wait(action3.Completion);
    Console.WriteLine("Process complete");
    Console.ReadLine();
}

在TPL数据从一个块流到另一个块中,您已经很接近了,您应该尽量保持这种范式。例如,action1应该是TransformManyBlock,因为ActionBlockITargetBlock(即终止块)。

当你在一个链接上指定propagate completion时,完整的事件会自动通过块路由,所以你只需要在最后一个块上执行一次wait()。

把它想象成一个多米诺骨牌链,你在第一个区块上调用complete,它将通过链传播到最后一个区块。

你还应该考虑什么和为什么你是多线程的;你的例子是严重的I/O绑定,我不认为绑定一堆线程等待I/O完成是正确的解决方案。

最后,注意什么是阻塞或不阻塞。在您的示例中,buffer1.Post(...)而不是阻塞调用,您没有理由在任务中拥有它。

我编写了以下使用TPL数据流的示例代码:

static void Main(string[] args)
{
    var filePath = "C:\test.csv";
    var chunkSize = 1024;
    var batchSize = 128;
    var linkCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };
    var uploadData = new ActionBlock<IEnumerable<string>>(
        async (data) =>
        {
            WebClient client = new WebClient();
            var payload = data.SelectMany(x => x).ToArray();
            byte[] bytes = System.Text.Encoding.ASCII.GetBytes(payload);
            //await client.UploadDataTaskAsync("myserver.com", bytes);
            await Task.Delay(2000);
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded /* Prefer to limit that to some reasonable value */ });
    var lineBuffer = new BatchBlock<string>(chunkSize);
    var splitData = new TransformManyBlock<IEnumerable<string>, IEnumerable<string>>(
        (data) =>
        {
            // Partition each chunk into smaller chunks grouped on column 1
            var partitions = data.GroupBy(c => c.Split(',')[0]);
            // Further beakdown the chunks into batch size groups
            var groups = partitions.Select(x => x.Select((i, index) => new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
            // Get batches from groups
            var batches = groups.SelectMany(x => x).Select(y => y.Select(z => z));
            // Don't forget to enumerate before returning
            return batches.ToList();
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    lineBuffer.LinkTo(splitData, linkCompletion);
    splitData.LinkTo(uploadData, linkCompletion);
    foreach (var line in File.ReadLines(filePath))
    {
        lineBuffer.Post(line);
    }
    lineBuffer.Complete();
    // Wait for uploads to finish
    uploadData.Completion.Wait();
}

最新更新