按顺序异步处理来自服务器的数据



我正在使用NetCoreServer连接到XMPP聊天服务器。一切都按计划进行。每当服务器发送消息时,我都会使用普通方法processData(string data)进行处理。问题是,如果该方法花费的时间超过特定的时间,服务器就会关闭连接。

我曾考虑异步执行该方法,但问题是来自服务器的消息可能会被分割成多个部分。处理数据的方法检测到这一点,如果接收到的消息只是整个消息的一部分,它会存储它。下次调用时,它会将新消息附加到旧消息上,检查新消息是否完成了它,或者是否需要等待下一条消息,以此类推,直到它有一条完整的消息。然后它将继续处理它,所以如果它是异步调用的,则调用必须等待以前的调用才能执行,而不会阻塞NetCoreServer的OnReceive。

每当来自服务器的新数据到达时,我都会考虑将var task=new Task(() => { ProcessData(result); });添加到队列中,但我不知道如何将它们的执行链接起来,也不知道如何继续。或者,我可以在数据到达时将其存储在队列中,并在队列中添加新消息时以某种方式触发事件来调用ProcessData。但我也有同样的问题,除了不知道如何触发之外,触发的事件应该等待前一个事件完成。

ProcessData看起来像这样:

public Class DataProcessor
{
private string Buffer;
public void processData(string data)
{
if(PartialData(data)) {
Buffer+=data;
return;
}
else //continue processing
}

有很多工具可以用来解决这个问题。在这里,我将展示一个TPL数据流解决方案。您将需要两个ActionBlock<T>,一个用于连接消息的拆分部分,另一个用于处理完整消息。我在下面按相反的顺序写,因为第一个区块在构建过程中需要了解第二个区块。本例假设每个完整消息的最后部分以一个点字符结尾:

var block2 = new ActionBlock<string[]>(parts =>
{
string completeMessage = String.Join(" ", parts);
Console.WriteLine($"Processing message: {completeMessage}");
});
var parts = new List<string>();
var block1 = new ActionBlock<string>(rawMessage =>
{
if (rawMessage is null) { block2.Complete(); return; }
parts.Add(rawMessage);
if (rawMessage.EndsWith("."))
{
block2.Post(parts.ToArray());
parts.Clear();
}
});
block1.Post("Hello");
block1.Post("world.");
block1.Post("The quick");
block1.Post("brown fox.");
block1.Post(null); // Signal that there are no more messages
block1.Complete();
await block2.Completion;
Console.WriteLine("Processing terminated");

输出:

Processing message: Hello world.
Processing message: The quick brown fox.
Processing terminated

(现场演示(

正如您所看到的,有一个List<string>,它保存了当前收到的不完整消息的部分。每次消息完成时,部分都会传播到block2,并且列表会被清除。

值为null的特殊消息表示将不再接收消息,因此您可以将Complete作为block2,将await作为其Completion,以干净而优雅地结束该过程。

两个ActionBlock<T>相互平行工作。它们中的每一个都包含自己的内部输入队列,其中包含要处理的消息(该队列是无边界的(。Post方法只是将消息发送到目标块的输入队列,而不等待消息的处理。如果消息被目标块接受,则该方法返回true,否则返回false。不接受消息的常见原因是调用了Complete方法,或者由于发生了未处理的异常而导致块失败。

TPL数据流库是.NET6中标准库的一部分。你不需要安装任何东西就可以使用它。除非你的目标是旧的.NET Framework,你需要在其中安装这个NuGet包。

最新更新