具有序列id的线程安全固定大小的循环缓冲区



我需要一个具有以下功能的队列:

  • 固定大小(即循环缓冲区)
  • 队列项具有id(类似于主键),它们是顺序的
  • 线程安全的(用于多个ASP。. NET核心请求)

为了避免锁定,我尝试了ConcurrentQueue,但发现了竞争条件。所以我在尝试一种自定义的方法。

public interface IQueueItem
{
long Id { get; set; }
}
public class CircularBuffer<T> : LinkedList<T> where T : class, IQueueItem
{
public CircularBuffer(int capacity) => _capacity = capacity;
private readonly int _capacity;
private long _counter = 0;
private readonly object _lock = new();
public void Enqueue(T item)
{
lock (_lock) {         // works but feels "heavy"
_counter++;
item.Id = _counter;
if (Count == _capacity) RemoveFirst();
AddLast(item);
}
}
}

And to test:

public class Item : IQueueItem
{
public long Id { get; set; }
//...
}
public class Program
{
public static void Main()
{
var q = new CircularBuffer<Item>(10);
Parallel.For(0, 15, i => q.Enqueue(new Item()));
Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
}
}

给出正确的输出(即使被竞争线程排队也是有序的,并且具有固定的大小,最老的项目被取消队列):

6, 7, 8, 9, 10, 11, 12, 13, 14, 15

在现实中,我有web请求读取(即枚举)队列。

:如果一个线程正在枚举队列,而另一个线程正在添加队列,我将有错误。(我可以在读取之前使用ToList(),但是对于一个大队列,它会占用服务器的所有内存,因为多个请求可以在一秒钟内完成多次)。我该如何处理这种情况?我使用的是链表,但我可以灵活地使用任何结构

(另外,这似乎是一个非常重的锁段;有没有更高效的方法?)

UPDATE
如下面的评论所述:我期望队列有几百到几万个项目,但是项目本身很小(只有几个基本数据类型)。我估计每秒钟都会有一个人排队。从web请求中读取的次数较少,假设每分钟几次(但可以同时发生在服务器向队列写入时)。

根据您在问题中提供的指标,您有很多选择。预期使用的CircularBuffer<T>并不是真的那么重。包装一个lock保护的Queue<T>应该工作得很好。在每个枚举上将队列内容复制到数组中的成本(每分钟复制10,000个元素几次)不太可能引人注目。现代机器可以在眨眼间做这些事情。您必须每秒枚举该集合数百次才能开始(稍微)成为一个问题。

在我最初的回答(修订3)中,我建议使用ImmutableQueue<T>作为底层存储。在仔细检查之后,我注意到这个类并不是真正的付费游戏。第一次枚举它时,它调用内部的BackwardsReversed属性(源代码),这是相当昂贵的。我的性能测试证实,在cpu时间和分配方面,它比lonix的答案中显示的简单的lock保护的Queue<T>解决方案更差。

下面是一个类似想法的低级实现,它利用了我们只需要ImmutableQueue<T>类功能的一个子集的事实。这些项存储在一个单链表结构中,可以在没有额外开销的情况下进行枚举:

public class ConcurrentCircularBuffer<T> : IEnumerable<T> where T : IQueueItem
{
private readonly object _locker = new();
private readonly int _capacity;
private Node _head;
private Node _tail;
private int _count = 0;
private long _lastId = 0;
private class Node
{
public readonly T Item;
public Node Next;
public Node(T item) => Item = item;
}
public ConcurrentCircularBuffer(int capacity)
{
if (capacity < 1) throw new ArgumentOutOfRangeException(nameof(capacity));
_capacity = capacity;
}
public int Count => Volatile.Read(ref _count);
public void Enqueue(T item)
{
lock (_locker)
{
Node node = new(item);
if (_head is null) _head = node;
if (_tail is not null) _tail.Next = node;
_tail = node;
if (_count < _capacity) _count++; else _head = _head.Next;
item.Id = ++_lastId;
}
}
public IEnumerator<T> GetEnumerator()
{
Node node; int count;
lock (_locker) { node = _head; count = _count; }
for (int i = 0; i < count && node is not null; i++, node = node.Next)
yield return node.Item;
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

lock保护的Queue<T>相比,这种方法的主要优点是它最小化了争用。握住锁所做的工作是微不足道的。

ConcurrentCircularBuffer<T>类的另一种实现,基于两个数组缓冲区,具有不同的优缺点,可以在这个答案的第5版中找到。

既然ConcurrentQueue不在这个问题中,你可以尝试固定数组。

IQueueItem[] items = new IQueueItem[SIZE];
long id = 0;

Enqueue很简单。

void Enqueue(IQueueItem item)
{
long id2 = Interlocked.Increment(ref id);
item.Id = id2 - 1;
items[id2 % SIZE] = item;
}

要输出数据,只需要将数组复制到一个新数组,然后对其排序。(当然,可以在这里进行优化)

var arr = new IQueueItem[SIZE];
Array.Copy(items, arr, SIZE);
return arr.Where(a => a != null).OrderBy(a => a.Id);

由于并发插入,数组中可能会有一些间隙,您可以取一个序列直到找到间隙。

var e = arr.Where(a => a != null).OrderBy(a => a.Id);
var firstId = e.First().Id;
return e.TakeWhile((a, index) => a.Id - index == firstId);

下面是另一个实现,使用带锁的Queue<T>

public interface IQueueItem
{
long Id { get; set; }
}
public class CircularBuffer<T> : IEnumerable<T> where T : class, IQueueItem
{
private readonly int _capacity;
private readonly Queue<T> _queue;
private long _lastId = 0;
private readonly object _lock = new();
public CircularBuffer(int capacity) {
_capacity = capacity;
_queue = new Queue<T>(capacity);
}
public void Enqueue(T item)
{
lock (_lock) {
if (_capacity < _queue.Count)
_queue.Dequeue();
item.Id = ++_lastId;
_queue.Enqueue(item);
}
}
public IEnumerator<T> GetEnumerator()
{
lock (_lock) {
var copy = _queue.ToArray();
return ((IEnumerable<T>)copy).GetEnumerator();
}
}
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

And to test:

public class Item : IQueueItem
{
private long _id;
public long Id
{
get => Volatile.Read(ref _id);
set => Volatile.Write(ref _id, value);
}
}
public class Program
{
public static void Main()
{
var q = new CircularBuffer<Item>(10);
Parallel.For(0, 15, i => q.Enqueue(new Item()));
Console.WriteLine(string.Join(", ", q.Select(x => x.Id)));
}
}

结果:

6, 7, 8, 9, 10, 11, 12, 13, 14, 15