我有一个具有重复元素的并发BlockingCollection
。如何修改它来添加或获得不同的元素?
BlockingCollection
的默认备份存储为ConcurrentQueue
。正如其他人指出的那样,用它来添加不同的项是相当困难的。
但是,您可以创建实现IProducerConsumerCollection
的自己的集合类型,并将其传递给BlockingCollection
构造函数。
想象一个ConcurrentDictionary
,它包含当前在队列中的项的键。要添加项目,首先在字典上调用TryAdd
,如果项目不在字典中,则添加它,并将其添加到队列中。Take
(和TryTake
)从队列中获取下一项,将其从字典中移除,然后返回。
我更喜欢如果有一个并发的HashTable
,但因为没有一个,你将不得不与ConcurrentDictionary
。
下面是一个具有队列行为的IProducerConsumerCollection<T>
集合的实现,它也拒绝重复项:
public class ConcurrentQueueNoDuplicates<T> : IProducerConsumerCollection<T>
{
private readonly Queue<T> _queue = new();
private readonly HashSet<T> _set;
private object Locker => _queue;
public ConcurrentQueueNoDuplicates(IEqualityComparer<T> comparer = default)
{
_set = new(comparer);
}
public bool TryAdd(T item)
{
lock (Locker)
{
if (!_set.Add(item))
throw new DuplicateKeyException();
_queue.Enqueue(item); return true;
}
}
public bool TryTake(out T item)
{
lock (Locker)
{
if (_queue.Count == 0)
throw new InvalidOperationException();
item = _queue.Dequeue();
bool removed = _set.Remove(item);
Debug.Assert(removed);
return true;
}
}
public int Count { get { lock (Locker) return _queue.Count; } }
public bool IsSynchronized => false;
public object SyncRoot => throw new NotSupportedException();
public T[] ToArray() { lock (Locker) return _queue.ToArray(); }
public IEnumerator<T> GetEnumerator() => ToArray().AsEnumerable().GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
public void CopyTo(T[] array, int index) => throw new NotSupportedException();
public void CopyTo(Array array, int index) => throw new NotSupportedException();
}
public class DuplicateKeyException : InvalidOperationException { }
使用例子:
BlockingCollection<Item> queue = new(new ConcurrentQueueNoDuplicates<Item>());
//...
try { queue.Add(item); }
catch (DuplicateKeyException) { Console.WriteLine($"The {item} was rejected."); }
警告:调用queue.TryAdd(item);
没有预期的行为,如果项目是重复的返回false
。任何添加重复项的尝试都会导致DuplicateKeyException
。不要试图"修复"。通过返回false
来实现上述ConcurrentQueueNoDuplicates<T>.TryAdd
或TryTake
。BlockingCollection<T>
的反应是抛出一个不同的异常(InvalidOperationException
),在此之上,它的内部状态将被破坏。目前有……. NET 7)一个bug,它减少了BlockingCollection<T>
的有效容量,其底层存储具有返回false
的TryAdd
实现。. net 8已经修复了这个错误,它将防止损坏,但不会改变抛出错误的行为。