通用列表并发访问-在存储数据时清除列表的一部分



我有一个通用的List<T>,其中来自web套接字的实时流数据正在被存储。我想把通用列表中的数据存储到数据库中,然后清空列表,这样就可以存储新的流数据,而不需要占用机器的内存。

如果我枚举列表发送数据到数据库,我得到异常,因为数据被添加到列表,而我试图枚举或清除列表。如果我在列表上应用锁,流数据将暂停,这是不允许的。

请建议我如何解决这个问题。

似乎是BatchBlock的工作

它是完全线程安全的,非常适合于数据流。在DataFlow . net库中有很多类,但最适合您的是BatchBlock

BatchBlock收集数据,直到达到大小阈值。当满足时,整批就是结果。您可以通过不同的方式获得结果,例如.ReceiveReceiveAll或它们的异步对应项。另一种方法是将批处理结果链接到另一个块,如ActionBlock,每次输入从源块(BatchBlock在这种情况下)提供给它时,它将异步调用提供的Action,所以基本上每次批处理得到满时,它都被发送到ActionBlock。ActionBlock可以接收一个类似MaxDegreeOfParallelism的参数,以避免数据库锁定或smth,但它不会以任何方式阻塞BatchBlock,因此客户端不需要等待,批次将被简单地放在队列中(线程安全)等待ActionBlock执行。

并且不用担心,当批处理满时,它也不会停止接收新项目,因此不会再次阻塞。一个漂亮的解决方案。

需要担心的一件事是,如果批处理没有达到完整大小,但您停止应用程序,结果将丢失,因此您可以手动TriggerBatch将批处理中的项目发送到ActionBlock。因此,您可以在Dispose或smth中调用TriggerBatch,由您决定。

BatchBlock中也有两种输入方式:PostSendAsync。我相信Post正在阻塞(尽管我不确定),但如果BatchBlock忙,SendAsync将推迟消息。

class ConcurrentCache<T> : IAsyncDisposable {
private readonly BatchBlock<T>    _batchBlock;
private readonly ActionBlock<T[]> _actionBlock;
private readonly IDisposable      _linkedBlock;
public ConcurrentCache(int cacheSize) {
_batchBlock = new BatchBlock<T>(cacheSize);
// action to do when the batch max capacity is met
// the action can be an async task
_actionBlock = new ActionBlock<T[]>(ReadBatchBlock);
_linkedBlock = _batchBlock.LinkTo(_actionBlock);
}
public async Task SendAsync(T item) {
await _batchBlock.SendAsync(item);
}
private void ReadBatchBlock(T[] items) {
foreach (var item in items) {
Console.WriteLine(item);
}
}
public async ValueTask DisposeAsync() {
_batchBlock.Complete();
await _batchBlock.Completion;
_batchBlock.TriggerBatch();
_actionBlock.Complete();
await _actionBlock.Completion;
_linkedBlock.Dispose();
}
}

使用例子:

await using var cache = new ConcurrentCache<int>(5);
for (int i = 0; i < 12; i++) {
await cache.SendAsync(i);
await Task.Delay(200);
}

当对象将被处置时,剩余的批次将被触发并打印。


更新

正如@TheodorZoulias所指出的,如果批处理没有被填满,并且很长一段时间没有消息,那么消息就会卡在BatchBlock中。解决方案是创建一个计时器来调用.TriggerBatch()

如果我在列表上应用锁,流数据将暂停,这是不允许的

您应该只持有尽可能短的时间。在本例中,应该是从列表中添加或删除一个项目。在向数据库添加数据或进行任何其他慢速操作时,不应该持有锁。取一个无争用锁的时间大约是25ns,这应该只在非常紧密的循环中出现问题。

但是一个更好的选择是使用内置的线程安全集合,比如BlockingCollection。后者非常方便,因为它有GetConsumingEnumerableCompleteAdding这样的方法。这让你的消费者只需使用常规的foreach循环来消费物品,而生产者只需调用CompleteAdding,让循环在所有物品都被处理完后退出。

您可能还想看看DataFlow。我自己没有使用过它,但它似乎适合于设置并发处理管道。

然而,在尝试进行任何类型的并发处理之前,您需要相当熟悉线程安全和所涉及的危险。线程安全是困难的,您需要知道做什么是安全的,做什么是不安全的。当你搞砸时,你不会总是那么幸运地得到一个异常,你可能只是得到丢失或不正确的数据。

我想你应该试试Parallel。ForEach和ConcurrentDictionary

var streamingDataList = new ConcurrentDictionary<int, StreamingDataModel>();
Parallel.ForEach(streamingDataBatch, streamingData =>
{                            
streamingDataList.TryAdd(streamingData.Id,streamingData.Data));
});