如何在单个线程中使用 C# async/await 正确读取和写入套接字/网络流?



我正在尝试用 C# 为专有 TCP 协议连接(发送/接收键+值对消息)编写客户端。我想使用NetworkStreamasync/await功能,以便我的程序可以使用单个线程(类似于JavaScript)读取和写入套接字,但是我对NetworkStream.ReadAsync的工作方式有问题。

这是我的计划大纲:

public static async Task Main(String[] args)
{
using( TcpClient tcp = new TcpClient() )
{
await tcp.ConnectAsync( ... );
using( NetworkStream ns = tcp.GetStream() )
{
while( true )
{
await RunInnerAsync( ns );
}
}
}
}
private static readonly ConcurrentQueue<NameValueCollection> pendingMessagesToSend = new ConcurrentQueue<NameValueCollection>();
private static async Task RunInnerAsync( NetworkStream ns )
{
// 1. Send any pending messages.
// 2. Read any newly received messages.
// 1:
while( !pendingMessagesToSend.IsEmpty )
{
// ( foreach Message, send down the NetworkStream here )
}
// 2:
Byte[] buffer = new Byte[1024];
while( ns.DataAvailable )
{
Int32 bytesRead = await ns.ReadAsync( buffer, 0, buffer.Length );
if( bytesRead == 0 ) break;
// ( process contents of `buffer` here )
} 
}

这里有一个问题:如果NetworkStream ns中没有要读取的数据(DataAvailable == false),那么Main中的while( true )循环会不断运行,并且CPU永远不会空闲 - 这很糟糕。

因此,如果我更改代码以删除DataAvailable检查并始终调用ReadAsync那么调用会有效地"阻止"直到数据可用 - 因此,如果没有数据到达,则此客户端将永远不会向远程主机发送任何消息。所以我考虑添加一个 500 毫秒左右的超时:

// 2:
Byte[] buffer = new Byte[1024];
while( ns.DataAvailable )
{
CancellationTokenSource cts = new CancellationTokenSource( 500 );
Task<Int32> readTask = ns.ReadAsync( buffer, 0, buffer.Length, cts.Token );
await readTask;
if( readTask.IsCancelled ) break;
// ( process contents of `buffer` here )
}

但是,这不起作用!显然,接受CancellationTokenNetworkStream.ReadAsync重载在实际请求取消时不会中止或停止,它总是忽略它(这怎么不是错误?

我链接到的 QA 建议了一种简单地关闭Socket/NetworkStream的解决方法 - 这对我来说是不合适的,因为我需要保持连接活动,但只是从等待数据到达并发送一些数据中休息一下

另一个答案建议共同等待Task.Delay,如下所示:

// 2:
Byte[] buffer = new Byte[1024];
while( ns.DataAvailable )
{
Task maxReadTime = Task.Delay( 500 );
Task readTask = ns.ReadAsync( buffer, 0, buffer.Length );
await Task.WhenAny( maxReadTime, readTask );
if( maxReadTime.IsCompleted )
{
// what do I do here to cancel the still-pending ReadAsync operation?
}
}

。然而,虽然这确实会阻止程序等待无限期的网络读取操作,但它不会停止读取操作本身- 所以当我的程序完成发送任何挂起的消息时,它会在等待数据到达的同时第二次调用ReadAsync- 这意味着处理重叠的 IO,根本不是我想要的。

我知道当直接使用Socket及其BeginReceive/EndReceive方法时,您只会从EndReceive内部调用BeginReceive- 但是第一次如何安全地调用BeginReceive,尤其是在循环中 - 以及在使用async/awaitAPI 时应该如何修改这些调用?

我通过从NetworkStream.ReadAsync获取Task并将其存储在一个可变的局部变量中来工作,如果没有挂起的ReadAsync操作,该变量设置为null,否则它是一个有效的Task实例。

不幸的是,由于async方法不能有ref参数,我需要将我的逻辑从RunInnerAsync移动到Main

这是我的解决方案:

private static readonly ConcurrentQueue<NameValueCollection> pendingMessagesToSend = new ConcurrentQueue<NameValueCollection>();
private static TaskCompletionSource<Object> queueTcs;
public static async Task Main(String[] args)
{
using( TcpClient tcp = new TcpClient() )
{
await tcp.ConnectAsync( ... );
using( NetworkStream ns = tcp.GetStream() )
{
Task<Int32> nsReadTask = null; // <-- this!
Byte[] buffer = new Byte[1024];
while( true )
{
if( nsReadTask == null )
{
nsReadTask = ns.ReadAsync( buffer, 0, buffer.Length );
}
if( queueTcs == null ) queueTcs = new TaskCompletionSource<Object>();
Task completedTask = await Task.WhenAny( nsReadTask, queueTcs.Task );
if( completedTask == nsReadTask )
{
while( ns.DataAvailable )
{
Int32 bytesRead = await ns.ReadAsync( buffer, 0, buffer.Length );
if( bytesRead == 0 ) break;
// ( process contents of `buffer` here )
} 
}
else if( completedTask == queueTcs )
{
while( !pendingMessagesToSend.IsEmpty )
{
// ( foreach Message, send down the NetworkStream here )
}
}
}
}
}
}

每当修改pendingMessagesToSend时,如果为 null,则会实例化queueTcs,并且SetResult(null)调用以取消等待Task completedTask = await Task.WhenAny( nsReadTask, queueTcs.Task );行。

自从构建此解决方案以来,我觉得我没有正确使用TaskCompletionSource,请参阅此 QA:是否可以使用 TaskCompletionSource 作为 WaitHandle 替代品?

最新更新