我正在实施某种缓冲机制:
private static readonly ConcurrentQueue<ProductDto> ProductBuffer = new ConcurrentQueue<ProductDto>();
private async void On_ProductReceived(object sender, ProductReceivedArgs e)
{
ProductBuffer.Enqueue(e.Product);
if (ProductBuffer.Count >= handlerConfig.ProductBufferSize)
{
var products = ProductBuffer.ToList();
ProductBuffer.Clear();
await SaveProducts(products);
}
}
和问题是 - 我是否应该添加某种锁定,以确保不会丢失数据(F.E.其他一些线程将在buffer.tolist((和buffer.clear.clear.clear.clear((,假设上添加product off offuct。或 conturrentqueue 将为我处理所有肮脏的工作?
您可以这样做:
if (ProductBuffer.Count < handlerConfig.ProductBufferSize)
return;
var productsToSave = new List<Product>();
Product dequeued = null;
while(ProductBuffer.TryDequeue(out dequeued))
{
productsToSave.Add(dequeued);
}
SaveProducts(products);
您永远不会Clear
队列。您只是继续拿出东西,直到它是空的。或者,当productsToSave
达到一定尺寸,该列表的过程,然后如果您不想一次保存太多产品,则可以停止将其取出。
这样,是否将新项目添加到队列中并不重要。如果您从队列中阅读时添加它们,他们也会被阅读。如果您停止从队列阅读后添加它们,他们将在那里并下次阅读,然后将队列填满并进行处理。
ConcurrentQueue
的点是您可以添加到它并从多个线程中读取它,而无需lock
。
如果要这样做:
productsToSave = ProductBuffer.ToList();
ProductBuffer.Clear();
然后,您需要lock
(这将破坏目的。(大概是您使用的是ConcurrentQueue
,因为多个线程可能会将项目添加到队列中。如果是这样,那么在这两个语句的执行之间,完全有可能进入队列。它不会添加到列表中,但会被Clear
删除。该项目将丢失。
这就是我的实现方式,我假设您不需要通知保存完成何时完成?
private void On_ProductReceived(object sender, ProductReceivedArgs e)
{
// Variable to hold potential list of products to save
List<Products> productsToSave;
// Lock buffer
lock(ProductBuffer)
{
ProductBuffer.Enqueue(e.Product);
// If it is under size, return immediately
if (ProductBuffer.Count < handlerConfig.ProductBufferSize)
return;
// Otherwise save products, clear buffer, release lock.
productsToSave = ProductBuffer.ToList();
ProductBuffer.Clear();
}
// Save Produts,
SaveProducts(products);
}
如果您获得了1个产品,并且什么都没有得到,您会不想在超时后保存它吗?
我会使用Rx
之类的用例,尤其是IObservable<T>.Buffer(count)