如何对 ConcurrentBag 进行排序?



我正在开发客户端/服务器应用程序。服务器向客户端发送消息,但无法保证顺序。我正在使用TCP...我不想讨论为什么无法保证订单(这与服务器上的线程有关(。

无论如何,在客户端上,我正在处理这样的消息:

private Queue<byte[]> rawMessagesIn = new Queue<byte[]>();
public ConcurrentBag<ServerToClient> messages = new ConcurrentBag<ServerToClient>();
public void Start()
{
var processTask = Task.Factory.StartNew(() =>
{
while (run)
{
process();
}
});
}
void process(){
if(rawMessagesIn.Count > 0){
var raw_message = rawMessagesIn.Dequeue();
var message = (ServerToClient)Utils.Deserialize(raw_message);
messages.Add(message);
}
}
private void OnDataReceived(object sender, byte[] data)
{
rawMessagesIn.Enqueue(data);
}

现在,重要的是,当我调用messages.TryTake()messages.TryPeek()时,消息out是序列中的下一个。每条消息都有一个数字/整数来表示其顺序。例如,message.number = 1

我需要使用TryPeek,因为索引 0 处的消息可能是正确的消息,也可能是错误的消息,在这种情况下,我们会从包中删除消息。但是,该消息可能是将来需要的消息,因此不应将其删除。

我尝试使用message.OrderBy(x=>x.number).ToList();但我看不出它将如何工作。如果我使用OrderBySL获取排序列表,并且索引 0 处的项目是正确的,我不能简单地删除或修改messages,因为我不知道它在ConcurrentBag中的位置!

有人对我有建议吗?

我的建议是从手动管理队列切换到 TPL 数据流库中的TransformBlock<TInput,TOutput>。此组件是输入队列和输出队列以及将TInput转换为TOutput的处理器的组合。EnsureOrdered功能是内置的,它是默认的。例:

private readonly TransformBlock<byte[], ServerToClient> _transformer;
public Client() // Constructor
{
_transformer = new((byte[] raw_message) =>
{
ServerToClient message = (ServerToClient)Utils.Deserialize(raw_message);
return message;
}, new ExecutionDataflowBlockOptions()
{
EnsureOrdered = true, // Just for clarity. true is the default.
MaxDegreeOfParallelism = 1, // the default is 1
});
}
private void OnDataReceived(object sender, byte[] data)
{
bool accepted = _transformer.Post(data);
// The accepted will be false in case the _transformer has failed.
}
public bool TryReceiveAll(out IList<ServerToClient> messages)
{
return _transformer.TryReceiveAll(out messages);
}

有许多方法可以使用存储在块的输出队列中的ServerToClient消息。上面的示例演示了TryReceiveAll方法。还有TryReceiveReceiveReceiveAsyncReceiveAllAsync(其中一些是扩展方法(。您还可以使用较低级别的方法OutputAvailableAsync,如下所示。还可以选择将其链接到另一个数据流块。

相关内容

  • 没有找到相关文章

最新更新