我正在开发客户端/服务器应用程序。服务器向客户端发送消息,但无法保证顺序。我正在使用TCP...我不想讨论为什么无法保证订单(这与服务器上的线程有关(。
无论如何,在客户端上,我正在处理这样的消息:
private Queue<byte[]> rawMessagesIn = new Queue<byte[]>();
public ConcurrentBag<ServerToClient> messages = new ConcurrentBag<ServerToClient>();
public void Start()
{
var processTask = Task.Factory.StartNew(() =>
{
while (run)
{
process();
}
});
}
void process(){
if(rawMessagesIn.Count > 0){
var raw_message = rawMessagesIn.Dequeue();
var message = (ServerToClient)Utils.Deserialize(raw_message);
messages.Add(message);
}
}
private void OnDataReceived(object sender, byte[] data)
{
rawMessagesIn.Enqueue(data);
}
现在,重要的是,当我调用messages.TryTake()
或messages.TryPeek()
时,消息out
是序列中的下一个。每条消息都有一个数字/整数来表示其顺序。例如,message.number = 1
我需要使用TryPeek
,因为索引 0 处的消息可能是正确的消息,也可能是错误的消息,在这种情况下,我们会从包中删除消息。但是,该消息可能是将来需要的消息,因此不应将其删除。
我尝试使用message.OrderBy(x=>x.number).ToList();
但我看不出它将如何工作。如果我使用OrderBy
并SL
获取排序列表,并且索引 0 处的项目是正确的,我不能简单地删除或修改messages
,因为我不知道它在ConcurrentBag
中的位置!
有人对我有建议吗?
我的建议是从手动管理队列切换到 TPL 数据流库中的TransformBlock<TInput,TOutput>
。此组件是输入队列和输出队列以及将TInput
转换为TOutput
的处理器的组合。EnsureOrdered
功能是内置的,它是默认的。例:
private readonly TransformBlock<byte[], ServerToClient> _transformer;
public Client() // Constructor
{
_transformer = new((byte[] raw_message) =>
{
ServerToClient message = (ServerToClient)Utils.Deserialize(raw_message);
return message;
}, new ExecutionDataflowBlockOptions()
{
EnsureOrdered = true, // Just for clarity. true is the default.
MaxDegreeOfParallelism = 1, // the default is 1
});
}
private void OnDataReceived(object sender, byte[] data)
{
bool accepted = _transformer.Post(data);
// The accepted will be false in case the _transformer has failed.
}
public bool TryReceiveAll(out IList<ServerToClient> messages)
{
return _transformer.TryReceiveAll(out messages);
}
有许多方法可以使用存储在块的输出队列中的ServerToClient
消息。上面的示例演示了TryReceiveAll
方法。还有TryReceive
、Receive
、ReceiveAsync
和ReceiveAllAsync
(其中一些是扩展方法(。您还可以使用较低级别的方法OutputAvailableAsync
,如下所示。还可以选择将其链接到另一个数据流块。