我有三个问题:
- 您通常如何看待我解决给定问题的方法?
- 您认为我可以进一步提高性能吗?
- 最重要的一点:如何使我的实现真正安全?
首先,我所处的简化场景:我正在通过消息传递系统与不同的设备进行通信。我在相当短的时间内接收和发送了成千上万的消息。我在一个多线程环境中,所以许多不同的任务正在发送和期待消息。对于消息接收,事件驱动的方法在使其线程安全方面给我们带来了很多麻烦。我有一些 Receiver 任务,它们从外部获取消息,并且必须将这些消息传递给许多消费者任务。
所以我想出了一个不同的方法:为什么不有一个几千条消息的历史记录,其中每个新消息都排队,使用者任务可以从最新项目向后搜索到最后一个处理的项目,以便获取所有新到达的消息。当然,这必须是快速且线程安全的。
我想出了链接环缓冲区的想法,并实现了以下内容:
public class LinkedRingBuffer<T>
{
private LinkedRingBufferNode<T> firstNode;
private LinkedRingBufferNode<T> lastNode;
public LinkedRingBuffer(int capacity)
{
Capacity = capacity;
Count = 0;
}
/// <summary>
/// Maximum count of items inside the buffer
/// </summary>
public int Capacity { get; }
/// <summary>
/// Actual count of items inside the buffer
/// </summary>
public int Count { get; private set; }
/// <summary>
/// Get value of the oldest buffer entry
/// </summary>
/// <returns></returns>
public T GetFirst()
{
return firstNode.Item;
}
/// <summary>
/// Get value of the newest buffer entry
/// </summary>
/// <returns></returns>
public T GetLast()
{
return lastNode.Item;
}
/// <summary>
/// Add item at the end of the buffer.
/// If capacity is reached the link to the oldest item is deleted.
/// </summary>
public void Add(T item)
{
/* create node and set to last one */
var node = new LinkedRingBufferNode<T>(lastNode, item);
lastNode = node;
/* if it is the first node, the created is also the first */
if (firstNode == null)
firstNode = node;
/* check for capacity reach */
Count++;
if(Count > Capacity)
{/* deleted all links to the current first so that its eventually gc collected */
Count = Capacity;
firstNode = firstNode.NextNode;
firstNode.PreviousNode = null;
}
}
/// <summary>
/// Iterate through the buffer from the oldest to the newest item
/// </summary>
public IEnumerable<T> LastToFirst()
{
var current = lastNode;
while(current != null)
{
yield return current.Item;
current = current.PreviousNode;
}
}
/// <summary>
/// Iterate through the buffer from the newest to the oldest item
/// </summary>
public IEnumerable<T> FirstToLast()
{
var current = firstNode;
while (current != null)
{
yield return current.Item;
current = current.NextNode;
}
}
/// <summary>
/// Iterate through the buffer from the oldest to given item.
/// If item doesn't exist it iterates until it reaches the newest
/// </summary>
public IEnumerable<T> LastToReference(T item)
{
var current = lastNode;
while (current != null)
{
yield return current.Item;
if (current.Item.Equals(item))
break;
current = current.PreviousNode;
}
}
/// <summary>
/// Iterate through the buffer from the newest to given item.
/// If item doesn't exist it iterates until it reaches the oldest
/// </summary>
public IEnumerable<T> FirstToReference(T item)
{
var current = firstNode;
while (current != null)
{
yield return current.Item;
if (current.Item.Equals(item))
break;
current = current.PreviousNode;
}
}
/// <summary>
/// Represents a linked node inside the buffer and holds the data
/// </summary>
private class LinkedRingBufferNode<A>
{
public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item)
{
Item = item;
NextNode = null;
PreviousNode = previousNode;
if(previousNode != null)
previousNode.NextNode = this;
}
internal A Item { get; }
internal LinkedRingBufferNode<A> PreviousNode { get; set; }
internal LinkedRingBufferNode<A> NextNode { get; private set; }
}
}
但不幸的是,我对多线程环境有点陌生,那么我如何使这个缓冲线程对多次读取和写入安全?
谢谢!
我认为最简单的方法是有一个同步object
,每当执行线程关键代码时,您都会lock
该同步。lock
块中的代码称为关键部分,一次只能由一个线程访问。任何其他希望访问它的线程都将等待,直到锁被释放。
定义和初始化:
private object Synchro;
public LinkedRingBuffer(int capacity)
{
Synchro = new object();
// Other constructor code
}
用法:
public T GetFirst()
{
lock(Synchro)
{
return firstNode.Item;
}
}
编写线程安全代码时,lock
某些部分似乎很明显。但是,如果您不确定是否要lock
语句或代码块,为了读写安全,您需要考虑:
- 此代码是否会影响任何其他锁定的关键部分的行为或结果。
- 任何其他锁定的关键部分是否会影响此代码的行为或结果。
您还需要重写一些自动实现的属性以具有支持字段。但是,这应该非常简单...
您对yield return
的使用虽然在单线程上下文中非常智能和高效,但在多线程上下文中会造成麻烦。这是因为收益率回报不会释放锁定语句(也不应该)。无论您在哪里使用 yield return
,您都必须在包装器中执行具体化。
线程安全代码如下所示:
public class LinkedRingBuffer<T>
{
private LinkedRingBufferNode<T> firstNode;
private LinkedRingBufferNode<T> lastNode;
private object Synchro;
public LinkedRingBuffer(int capacity)
{
Synchro = new object();
Capacity = capacity;
Count = 0;
}
/// <summary>
/// Maximum count of items inside the buffer
/// </summary>
public int Capacity { get; }
/// <summary>
/// Actual count of items inside the buffer
/// </summary>
public int Count
{
get
{
lock (Synchro)
{
return _count;
}
}
private set
{
_count = value;
}
}
private int _count;
/// <summary>
/// Get value of the oldest buffer entry
/// </summary>
/// <returns></returns>
public T GetFirst()
{
lock (Synchro)
{
return firstNode.Item;
}
}
/// <summary>
/// Get value of the newest buffer entry
/// </summary>
/// <returns></returns>
public T GetLast()
{
lock (Synchro)
{
return lastNode.Item;
}
}
/// <summary>
/// Add item at the end of the buffer.
/// If capacity is reached the link to the oldest item is deleted.
/// </summary>
public void Add(T item)
{
lock (Synchro)
{
/* create node and set to last one */
var node = new LinkedRingBufferNode<T>(lastNode, item);
lastNode = node;
/* if it is the first node, the created is also the first */
if (firstNode == null)
firstNode = node;
/* check for capacity reach */
Count++;
if (Count > Capacity)
{
/* deleted all links to the current first so that its eventually gc collected */
Count = Capacity;
firstNode = firstNode.NextNode;
firstNode.PreviousNode = null;
}
}
}
/// <summary>
/// Iterate through the buffer from the oldest to the newest item
/// </summary>
public IEnumerable<T> LastToFirst()
{
lock (Synchro)
{
var materialized = LastToFirstInner().ToList();
return materialized;
}
}
private IEnumerable<T> LastToFirstInner()
{
var current = lastNode;
while (current != null)
{
yield return current.Item;
current = current.PreviousNode;
}
}
/// <summary>
/// Iterate through the buffer from the newest to the oldest item
/// </summary>
public IEnumerable<T> FirstToLast()
{
lock (Synchro)
{
var materialized = FirstToLastInner().ToList();
return materialized;
}
}
private IEnumerable<T> FirstToLastInner()
{
var current = firstNode;
while (current != null)
{
yield return current.Item;
current = current.NextNode;
}
}
/// <summary>
/// Iterate through the buffer from the oldest to given item.
/// If item doesn't exist it iterates until it reaches the newest
/// </summary>
public IEnumerable<T> LastToReference(T item)
{
lock (Synchro)
{
var materialized = LastToReferenceInner(item).ToList();
return materialized;
}
}
private IEnumerable<T> LastToReferenceInner(T item)
{
var current = lastNode;
while (current != null)
{
yield return current.Item;
if (current.Item.Equals(item))
break;
current = current.PreviousNode;
}
}
/// <summary>
/// Iterate through the buffer from the newest to given item.
/// If item doesn't exist it iterates until it reaches the oldest
/// </summary>
public IEnumerable<T> FirstToReference(T item)
{
lock (Synchro)
{
var materialized = FirstToReferenceInner(item).ToList();
return materialized;
}
}
private IEnumerable<T> FirstToReferenceInner(T item)
{
var current = firstNode;
while (current != null)
{
yield return current.Item;
if (current.Item.Equals(item))
break;
current = current.PreviousNode;
}
}
/// <summary>
/// Represents a linked node inside the buffer and holds the data
/// </summary>
private class LinkedRingBufferNode<A>
{
public LinkedRingBufferNode(LinkedRingBufferNode<A> previousNode, A item)
{
Item = item;
NextNode = null;
PreviousNode = previousNode;
if (previousNode != null)
previousNode.NextNode = this;
}
internal A Item { get; }
internal LinkedRingBufferNode<A> PreviousNode { get; set; }
internal LinkedRingBufferNode<A> NextNode { get; private set; }
}
}
可以进行一些优化,例如,您不需要在关键部分中创建LinkedRingBufferNode
对象,但是在创建对象之前,您必须将lastNode
值复制到关键部分中的局部变量。