多个消费者在BlockingCollection并发处理?



我有一个BlockingCollection,我从一个线程写入,我从另一个线程读取。生产者线程从服务器接收条目并将它们添加到BlockingCollection中,而读取线程试图清空BlockingCollection并处理它们。

问题是我试图分批清空队列,因为逐个处理它们太慢了。但是当它被不断地写入(数千个项目)时,那么消费者线程会一直读取它们,直到清空,这意味着直到写入完成才会开始处理。

现在,消费者中的处理可以并行完成,所以我一直在想如何实现这一点。

目前我有两个想法:

  1. 从消费者的BlockingCollection中读取一定数量的项目后,启动一个新的并行作业来处理它们,而不是等待队列完全清空然后开始处理。

  2. 使用多个消费者并希望它们能够并行运行,而不是在试图同时读取BlockingCollection时不断地相互阻塞。

所以我的问题是关于选项2 -是BlockingCollection内部优化这种情况?它会对读取的区域进行分区吗?还是消费者会争夺每个条目?如果是这样,那么选项1更好?

添加另一个选项:(绝不是生产就绪的!)

这利用了TPL的数据流,其中BatchBlock<T>为我们抽象了批处理。

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
public class HoneyBatcher
{
private const int BATCHSIZE = 10; // Find the size that works best for you.
private readonly BatchBlock<Honey> batchBlock;
private readonly ExecutionDataflowBlockOptions _options = 
new ExecutionDataflowBlockOptions()
{
// I'd start with 1, then benchmark if higher number actually benefits.
MaxDegreeOfParallelism = 1, 
SingleProducerConstrained = true // if so, may micro-optimize throughput
};
// vv Whatever process you want done on a batch
public HoneyBatcher( Action<Honey[]> batchProcessor )
{
// BatchBlock does the batching
// and is the entrypoint to the pipline.
batchBlock = new BatchBlock<Honey>(BATCHSIZE);
// processorBlock processes each batch that batchBlock will produce
// Parallel executions as well as other tweaks can be configured through options.
ActionBlock<Honey[]> processorBlock = 
new ActionBlock<Honey[]>(batchProcessor, _options);
// build the pipline
batchBlock.LinkTo(processorBlock);
// item => batchBlock => item[BATCHSIZE] => batchProcessor(item[])
}
// Add item individually and have them batched up
// and processed in a pipeline.
public Task<bool> ProcessAsync(Honey item)
{
return batchBlock.SendAsync(item);
// Can also be done with sync API.
}
}
public class Honey 
{
// Just a dummy
}

请注意,上面的片段只是一个粗略的布局的想法。在生产环境中,您当然要处理错误处理、完成等。

处理成批条目的自然方法是插入BlockingCollection中批量删除它们,而不是试图删除他们后来分批。换句话说,你可以用BlockingCollection<T[]>代替BlockingCollection<T>。生产者线程可以通过使用Queue<T>:

轻松地进行批处理。
var queue = new Queue<T>;
while (someCondition)
{
var item = ProduceItem();
queue.Enqueue(item);
if (queue.Count == batchSize)
{
blockingCollection.Add(queue.ToArray());
queue.Clear();
}
}
if (queue.Count > 0)
{
blockingCollection.Add(queue.ToArray());
queue.Clear();
}
blockingCollection.CompleteAdding();

根据不同的情况,你也可以使用一些linq风格的操作符,比如MoreLinq库中的Batch

最后,回答你的主要问题,是的,BlockingCollection类可以很好地处理多个消费者和多个生产者。在集合为空的情况下,所有的消费者都被阻塞,当一个项目到达时,它被分配给等待的消费者之一。