并发收集,支持随机(FIFO)和特定的Remove



我正在编写一个应用程序,该应用程序管理一个集合,该集合需要在多线程环境中频繁地对项目进行排队和脱队。对于单线程,一个简单的List可能就足够了,但是环境的并发性带来了一些问题。

以下是摘要:

结构需要有一个bool TryAdd(T)方法,最好是Add(TKey, TValue);

结构需要有一个T TryRemove()方法,该方法接受随机的或最好是第一个添加的项(本质上实现了FIFO队列);

结构体需要有一个bool TryRemove(T)方法,最好是Remove(TKey);

到目前为止,我有三个想法,都有他们的问题:

  1. 实现一个包含ConcurrentDictionary<TKey,>和这样的ConcurrentQueue:
internal class ConcurrentQueuedDictionary<TKey, TValue> where TKey : notnull
{
ConcurrentDictionary<TKey, TValue> _dictionary;
ConcurrentQueue<TKey> _queue;
object _locker;
public bool TryAdd(TKey key, TValue value)
{
if (!_dictionary.TryAdd(key, value))
return false;
lock (_locker)
_queue.Enqueue(key);
return true;
}
public TValue TryRemove()
{
TKey key;
lock (_locker) {
if (_queue.IsEmpty)
return default(TValue);
_queue.TryDequeue(out key);
}
TValue value;
if (!_dictionary.Remove(key, out value))
throw new Exception();
return value;
}
public bool TryRemove(TKey key)
{
lock (_locker)
{
var copiedList = _queue.ToList();
if (copiedList.Remove(key))
return false;
_queue = new(copiedList);
}
return _dictionary.TryRemove(key, out _);
}
}

,但这将需要锁在Remove(T)上,因为它需要初始队列的完整深度副本,不包含被删除的项,同时不允许从其他线程读取,这意味着至少Remove()也会有这个锁,这意味着这是一个经常执行的操作;

  1. 实现一个包含ConcurrentDictionary<TKey,>和ConcurrentDictionary
  2. 直接锁定List,因为锁对于选项1是必要的。

任何想法?我有点被这个问题难住了,因为我对并发集合没有最好的把握。我需要一个自定义的IProducerConsumerCollection吗?是否有可能同时对并发集合元素进行随机(或排队)和特定访问?你们中有人以前遇到过这个问题吗,也许我看问题的角度不对?

编辑:打字错误,格式

通过组合内置并发集合来创建这样的并发结构应该几乎是不可能的,当然前提是正确性是最重要的,并且严格禁止竞争条件。好消息是,如果受保护区域内的操作是轻量级的(其持续时间以纳秒为单位),那么每秒获取数千次lock远没有达到争用开始成为问题的极限。

实现0(1)复杂度的一种方法是将LinkedList<T>Dictionary<K,V>组合在一起:

/// <summary>
/// Represents a thread-safe first in-first out (FIFO) collection of key/value pairs,
/// where the key is unique.
/// </summary>
public class ConcurrentKeyedQueue<TKey, TValue>
{
private readonly LinkedList<KeyValuePair<TKey, TValue>> _queue;
private readonly Dictionary<TKey, LinkedListNode<KeyValuePair<TKey, TValue>>>
_dictionary;
public ConcurrentKeyedQueue(IEqualityComparer<TKey> comparer = default)
{
_queue = new();
_dictionary = new(comparer);
}
public int Count { get { lock (_queue) return _queue.Count; } }
public bool TryEnqueue(TKey key, TValue value)
{
lock (_queue)
{
ref var node = ref CollectionsMarshal
.GetValueRefOrAddDefault(_dictionary, key, out bool exists);
if (exists) return false;
node = new(new(key, value));
_queue.AddLast(node);
Debug.Assert(_queue.Count == _dictionary.Count);
return true;
}
}
public bool TryDequeue(out TKey key, out TValue value)
{
lock (_queue)
{
if (_queue.Count == 0) { key = default; value = default; return false; }
var node = _queue.First;
(key, value) = node.Value;
_queue.RemoveFirst();
bool removed = _dictionary.Remove(key);
Debug.Assert(removed);
Debug.Assert(_queue.Count == _dictionary.Count);
return true;
}
}
public bool TryTake(TKey key, out TValue value)
{
lock (_queue)
{
bool removed = _dictionary.Remove(key, out var node);
if (!removed) { value = default; return false; }
_queue.Remove(node);
(_, value) = node.Value;
Debug.Assert(_queue.Count == _dictionary.Count);
return true;
}
}
public KeyValuePair<TKey, TValue>[] ToArray()
{
lock (_queue) return _queue.ToArray();
}
}

这个组合也用于创建LRU缓存。

您可以通过使用Monitor.LockContentionCount属性在您自己的负载环境中测量lock争用:"获取在尝试获取监视器锁时发生争用的次数。">如果您看到每秒的增量是一个单位数,则没有什么可担心的。

对于不使用CollectionsMarshal.GetValueRefOrAddDefault方法的版本,因此它可以在。net 6之前的版本上使用,请参阅此答案的第一个版本。

最新更新