线程安全缓冲区



我正在实施某种缓冲机制:

private static readonly ConcurrentQueue<ProductDto> ProductBuffer = new ConcurrentQueue<ProductDto>();
private async void On_ProductReceived(object sender, ProductReceivedArgs e)
{
    ProductBuffer.Enqueue(e.Product);
    if (ProductBuffer.Count >= handlerConfig.ProductBufferSize)
    {
        var products = ProductBuffer.ToList();
        ProductBuffer.Clear();
        await SaveProducts(products);
    }
}

和问题是 - 我是否应该添加某种锁定,以确保不会丢失数据(F.E.其他一些线程将在buffer.tolist((和buffer.clear.clear.clear.clear((,假设上添加product off offuct。或 conturrentqueue 将为我处理所有肮脏的工作?

您可以这样做:

if (ProductBuffer.Count < handlerConfig.ProductBufferSize)
    return;
var productsToSave = new List<Product>();
Product dequeued = null;
while(ProductBuffer.TryDequeue(out dequeued))
{
    productsToSave.Add(dequeued);
}
SaveProducts(products);

您永远不会Clear队列。您只是继续拿出东西,直到它是空的。或者,当productsToSave达到一定尺寸,该列表的过程,然后如果您不想一次保存太多产品,则可以停止将其取出。

这样,是否将新项目添加到队列中并不重要。如果您从队列中阅读时添加它们,他们也会被阅读。如果您停止从队列阅读后添加它们,他们将在那里并下次阅读,然后将队列填满并进行处理。

ConcurrentQueue的点是您可以添加到它并从多个线程中读取它,而无需lock


如果要这样做:

    productsToSave = ProductBuffer.ToList();
    ProductBuffer.Clear();

然后,您需要lock(这将破坏目的。(大概是您使用的是ConcurrentQueue,因为多个线程可能会将项目添加到队列中。如果是这样,那么在这两个语句的执行之间,完全有可能进入队列。它不会添加到列表中,但会被Clear删除。该项目将丢失。

这就是我的实现方式,我假设您不需要通知保存完成何时完成?

private void On_ProductReceived(object sender, ProductReceivedArgs e)
{
    // Variable to hold potential list of products to save
    List<Products> productsToSave;
    // Lock buffer
    lock(ProductBuffer)
    {
        ProductBuffer.Enqueue(e.Product);
        // If it is under size, return immediately
        if (ProductBuffer.Count < handlerConfig.ProductBufferSize)
            return;
        // Otherwise save products, clear buffer, release lock.
        productsToSave = ProductBuffer.ToList();
        ProductBuffer.Clear();
    }
    // Save Produts, 
    SaveProducts(products);
}

如果您获得了1个产品,并且什么都没有得到,您会不想在超时后保存它吗?

我会使用Rx之类的用例,尤其是IObservable<T>.Buffer(count)

相关内容

  • 没有找到相关文章

最新更新