使 LinkedRingBuffer 线程的 C# 实现安全



我有三个问题:

  1. 您通常如何看待我解决给定问题的方法?
  2. 您认为我可以进一步提高性能吗?
  3. 最重要的一点:如何使我的实现真正安全?

首先,我所处的简化场景:我正在通过消息传递系统与不同的设备进行通信。我在相当短的时间内接收和发送了成千上万的消息。我在一个多线程环境中,所以许多不同的任务正在发送和期待消息。对于消息接收,事件驱动的方法在使其线程安全方面给我们带来了很多麻烦。我有一些 Receiver 任务,它们从外部获取消息,并且必须将这些消息传递给许多消费者任务。

所以我想出了一个不同的方法:为什么不有一个几千条消息的历史记录,其中每个新消息都排队,使用者任务可以从最新项目向后搜索到最后一个处理的项目,以便获取所有新到达的消息。当然,这必须是快速且线程安全的。

我想出了链接环缓冲区的想法,并实现了以下内容:

  public class LinkedRingBuffer<T>
  {
     private LinkedRingBufferNode<T> firstNode;
     private LinkedRingBufferNode<T> lastNode;
     public LinkedRingBuffer(int capacity)
     {
        Capacity = capacity;
        Count = 0;
     }
     /// <summary>
     /// Maximum count of items inside the buffer
     /// </summary>
     public int Capacity { get; }
     /// <summary>
     /// Actual count of items inside the buffer
     /// </summary>
     public int Count { get; private set; }
     /// <summary>
     /// Get value of the oldest buffer entry
     /// </summary>
     /// <returns></returns>
     public T GetFirst()
     {
        return firstNode.Item;
     }
     /// <summary>
     /// Get value of the newest buffer entry
     /// </summary>
     /// <returns></returns>
     public T GetLast()
     {
        return lastNode.Item;
     }
     /// <summary>
     /// Add item at the end of the buffer. 
     /// If capacity is reached the link to the oldest item is deleted.
     /// </summary>
     public void Add(T item)
     {
        /* create node and set to last one */
        var node = new LinkedRingBufferNode<T>(lastNode, item);
        lastNode = node;
        /* if it is the first node, the created is also the first */
        if (firstNode == null)
           firstNode = node;
        /* check for capacity reach */
        Count++;
        if(Count > Capacity)
        {/* deleted all links to the current first so that its eventually gc collected */
           Count = Capacity;
           firstNode = firstNode.NextNode;
           firstNode.PreviousNode = null;
        }
     }
     /// <summary>
     /// Iterate through the buffer from the oldest to the newest item
     /// </summary>
     public IEnumerable<T> LastToFirst()
     {
        var current = lastNode;
        while(current != null)
        {
           yield return current.Item;
           current = current.PreviousNode;
        }
     }
     /// <summary>
     /// Iterate through the buffer from the newest to the oldest item
     /// </summary>
     public IEnumerable<T> FirstToLast()
     {
        var current = firstNode;
        while (current != null)
        {
           yield return current.Item;
           current = current.NextNode;
        }
     }
     /// <summary>
     /// Iterate through the buffer from the oldest to given item. 
     /// If item doesn't exist it iterates until it reaches the newest
     /// </summary>
     public IEnumerable<T> LastToReference(T item)
     {
        var current = lastNode;
        while (current != null)
        {
           yield return current.Item;
           if (current.Item.Equals(item))
              break;
           current = current.PreviousNode;
        }
     }
     /// <summary>
     /// Iterate through the buffer from the newest to given item. 
     /// If item doesn't exist it iterates until it reaches the oldest
     /// </summary>
     public IEnumerable<T> FirstToReference(T item)
     {
        var current = firstNode;
        while (current != null)
        {
           yield return current.Item;
           if (current.Item.Equals(item))
              break;
           current = current.PreviousNode;
        }
     }
     /// <summary>
     /// Represents a linked node inside the buffer and holds the data
     /// </summary>
     private class LinkedRingBufferNode<A>
     {
        public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item)
        {
           Item = item;
           NextNode = null;
           PreviousNode = previousNode;
           if(previousNode != null)
              previousNode.NextNode = this;
        }
        internal A Item { get; }
        internal LinkedRingBufferNode<A> PreviousNode { get; set; }
        internal LinkedRingBufferNode<A> NextNode { get; private set; }
     }
  }

但不幸的是,我对多线程环境有点陌生,那么我如何使这个缓冲线程对多次读取和写入安全?

谢谢!

我认为最简单的方法是有一个同步object,每当执行线程关键代码时,您都会lock该同步。lock块中的代码称为关键部分,一次只能由一个线程访问。任何其他希望访问它的线程都将等待,直到锁被释放。

定义和初始化:

private object Synchro;
public LinkedRingBuffer(int capacity)
{
    Synchro = new object();
    // Other constructor code
}

用法:

public T GetFirst()
{
    lock(Synchro)
    {
        return firstNode.Item;
    }
}

编写线程安全代码时,lock某些部分似乎很明显。但是,如果您不确定是否要lock语句或代码块,为了读写安全,您需要考虑:

  • 此代码是否会影响任何其他锁定的关键部分的行为或结果。
  • 任何其他锁定的关键部分是否会影响此代码的行为或结果。

您还需要重写一些自动实现的属性以具有支持字段。但是,这应该非常简单...

您对yield return的使用虽然在单线程上下文中非常智能和高效,但在多线程上下文中会造成麻烦。这是因为收益率回报不会释放锁定语句(也不应该)。无论您在哪里使用 yield return,您都必须在包装器中执行具体化。

线程安全代码如下所示:

public class LinkedRingBuffer<T>
{
    private LinkedRingBufferNode<T> firstNode;
    private LinkedRingBufferNode<T> lastNode;
    private object Synchro;
    public LinkedRingBuffer(int capacity)
    {
        Synchro = new object();
        Capacity = capacity;
        Count = 0;
    }
    /// <summary>
    /// Maximum count of items inside the buffer
    /// </summary>
    public int Capacity { get; }
    /// <summary>
    /// Actual count of items inside the buffer
    /// </summary>
    public int Count
    {
        get
        {
            lock (Synchro)
            {
                return _count;
            }
        }
        private set
        {
            _count = value;
        }
    }
    private int _count;
    /// <summary>
    /// Get value of the oldest buffer entry
    /// </summary>
    /// <returns></returns>
    public T GetFirst()
    {
        lock (Synchro)
        {
            return firstNode.Item;
        }
    }
    /// <summary>
    /// Get value of the newest buffer entry
    /// </summary>
    /// <returns></returns>
    public T GetLast()
    {
        lock (Synchro)
        {
            return lastNode.Item;
        }
    }
    /// <summary>
    /// Add item at the end of the buffer. 
    /// If capacity is reached the link to the oldest item is deleted.
    /// </summary>
    public void Add(T item)
    {
        lock (Synchro)
        {
            /* create node and set to last one */
            var node = new LinkedRingBufferNode<T>(lastNode, item);
            lastNode = node;
            /* if it is the first node, the created is also the first */
            if (firstNode == null)
                firstNode = node;
            /* check for capacity reach */
            Count++;
            if (Count > Capacity)
            {
                /* deleted all links to the current first so that its eventually gc collected */
                Count = Capacity;
                firstNode = firstNode.NextNode;
                firstNode.PreviousNode = null;
            }
        }
    }
    /// <summary>
    /// Iterate through the buffer from the oldest to the newest item
    /// </summary>
    public IEnumerable<T> LastToFirst()
    {
        lock (Synchro)
        {
            var materialized = LastToFirstInner().ToList();
            return materialized;
        }
    }
    private IEnumerable<T> LastToFirstInner()
    {
        var current = lastNode;
        while (current != null)
        {
            yield return current.Item;
            current = current.PreviousNode;
        }
    }
    /// <summary>
    /// Iterate through the buffer from the newest to the oldest item
    /// </summary>
    public IEnumerable<T> FirstToLast()
    {
        lock (Synchro)
        {
            var materialized = FirstToLastInner().ToList();
            return materialized;
        }
    }
    private IEnumerable<T> FirstToLastInner()
    {
        var current = firstNode;
        while (current != null)
        {
            yield return current.Item;
            current = current.NextNode;
        }
    }
    /// <summary>
    /// Iterate through the buffer from the oldest to given item. 
    /// If item doesn't exist it iterates until it reaches the newest
    /// </summary>
    public IEnumerable<T> LastToReference(T item)
    {
        lock (Synchro)
        {
            var materialized = LastToReferenceInner(item).ToList();
            return materialized;
        }
    }
    private IEnumerable<T> LastToReferenceInner(T item)
    {
        var current = lastNode;
        while (current != null)
        {
            yield return current.Item;
            if (current.Item.Equals(item))
                break;
            current = current.PreviousNode;
        }
    }
    /// <summary>
    /// Iterate through the buffer from the newest to given item. 
    /// If item doesn't exist it iterates until it reaches the oldest
    /// </summary>
    public IEnumerable<T> FirstToReference(T item)
    {
        lock (Synchro)
        {
            var materialized = FirstToReferenceInner(item).ToList();
            return materialized;
        }
    }
    private IEnumerable<T> FirstToReferenceInner(T item)
    {
        var current = firstNode;
        while (current != null)
        {
            yield return current.Item;
            if (current.Item.Equals(item))
                break;
            current = current.PreviousNode;
        }
    }
    /// <summary>
    /// Represents a linked node inside the buffer and holds the data
    /// </summary>
    private class LinkedRingBufferNode<A>
    {
        public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item)
        {
            Item = item;
            NextNode = null;
            PreviousNode = previousNode;
            if (previousNode != null)
                previousNode.NextNode = this;
        }
        internal A Item { get; }
        internal LinkedRingBufferNode<A> PreviousNode { get; set; }
        internal LinkedRingBufferNode<A> NextNode { get; private set; }
    }
}

可以进行一些优化,例如,您不需要在关键部分中创建LinkedRingBufferNode对象,但是在创建对象之前,您必须将lastNode值复制到关键部分中的局部变量。

相关内容

  • 没有找到相关文章

最新更新