C# .NET - 带计时器的缓冲区消息



我需要实现一个同样基于定时的消息缓冲系统。

我需要做的是存储我的类的实例,然后在达到 100 个实例或 1 分钟后将它们转发。

基本上:

List<Message> messages;
public void GotNewMessage(Message msg)
{
messages.add(msg);
if (messages.count() == 100 || timer.elapsed(1 minute))
{
SendMessages(messages);
messages.clear()
}
}

我似乎无法弄清楚如何在不过度使用锁的情况下实现这一点,这将大大减慢该过程。有谁知道实施这样一个系统的好方法?提前谢谢。

对于这些需求(将时间与序列相结合),有一个很棒的库,它是反应式扩展。见 https://github.com/Reactive-Extensions/Rx.NET

然后你可以写一些类似的东西

void Main()
{
messages
.Buffer(TimeSpan.FromMinutes(1), 100) // Buffer until 100 items or 1 minute has elapsed, whatever comes first.
.Subscribe(msgs => SendMessages(msgs));     
}
Subject<Message> messages = new Subject<Message>();
public void GotNewMessage(Message msg)
{
messages.OnNext(msg);
}

注意:这还没有准备好生产,但它显示了如何做到这一点的基本知识。根据您从何处接收消息,有更好的方法来创建要订阅的可观察对象。

更多参考资料:

  • http://www.introtorx.com/
  • https://msdn.microsoft.com/en-us/library/hh242985(v=vs.103).aspx

如果使用事件接收消息,则可以将事件链接到 RX 流,请参阅 https://msdn.microsoft.com/en-us/library/hh242978(v=vs.103).aspx 和 https://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.fromeventpattern(v=vs.103).aspx

首先,你应该考虑使用ConcurrentQueue<>而不是List<>ConcurrentQueue<>始终是线程安全的,不需要额外的锁。这样,您已经为自己节省了消息队列的锁。联锁在不可用时提供原子性。

根据 C# 语言规范,独立读/写是原子的(但仅适用于某些数据类型,并且 long并不总是原子的 - 这就是为什么我移动DateTime.Now.Ticks以获得 int32 而不会丢失任何会影响经过时间的位)和读取-修改-写入(例如 ++i)从来都不是原子的。

移位(例如<<</strong>)是独立的原子,不需要任何额外的锁定。

private ConcurrentQueue<Message> Queue = new ConcurrentQueue<Message>();
private int QueueSize = 0;
private int LastSend = (int)(DateTime.Now.Ticks >> 23);
private int LastMessage = (int)(DateTime.Now.Ticks >> 23);
public void GotNewMessage(Message Message)
{
Queue.Enqueue(Message);
Interlocked.Increment(ref QueueSize);
Interlocked.Exchange(ref LastMessage, (int)(DateTime.Now.Ticks >> 23));
if (Interlocked.CompareExchange(ref QueueSize, 0, 100) >= 100 || 
LastMessage - LastSend >= 60)
{
Message Dummy;
while (!Queue.IsEmpty)
if (Queue.TryDequeue(out Dummy))
SendMessage(Dummy);
Interlocked.Exchange(ref LastSend, (int)(DateTime.Now.Ticks >> 23));
}
}
public void SendMessage(Message Message)
{
// ...
}

编辑:可能会发生,发送超过100条消息。如果您希望严格发送 100 条消息,则可以在循环中实现另一个原子增量。

最新更新