CancellationToken请求取消时,NamedPipeServerStream.ReadAsync()不会退出



NamedPipeServer流从管道读取任何数据时,不会对CancellationTokenSource.Cancel()做出反应

为什么?

如何限制我在服务器中等待来自客户端的数据的时间?

要复制的代码:

static void Main(string[] args)
{
Server();
Clinet();
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
}
private static async Task Server()
{
using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
{
var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
var buffer = new byte[4];
await server.ReadAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
}
}
private static async Task Clinet()
{
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
{
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
await Task.Delay(5000);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
Console.WriteLine("client exit");
}
}

预期结果:

exit server
<client throws exception cuz server closed pipe>

实际结果:

client exit
exit server

编辑

CancelIo的答案似乎很有希望,而且它确实允许服务器在取消令牌时结束通信。然而,我不明白为什么我的"基本场景"在使用ReadPipeAsync时停止工作。

这是代码,它包括2个客户端功能:

  1. Clinet_ShouldWorkFine-一个及时读/写的好客户端
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow-客户端速度太慢,服务器应终止通信

预期:

  1. Clinet_ShouldWorkFine-执行结束,没有任何异常
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow-服务器关闭管道,客户端抛出异常

实际:

  1. Clinet_ShouldWorkFine-服务器在第一次调用ReadPipeAsync时停止,管道在1秒后关闭,客户端抛出异常
  2. Clinet_ServerShouldEndCommunication_CuzClientIsSlow-服务器关闭管道,客户端抛出异常

为什么服务器使用ReadPipeAsyncClinet_ShouldWorkFine不工作

class Program
{
static void Main(string[] args) {
// in this case server should close the pipe cuz client is too slow
try {
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c => {
Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
});
tasks[2] = Clinet_ServerShouldEndCommunication_CuzClientIsSlow();
Task.WhenAll(tasks).Wait();
}
catch (Exception ex) {
Console.WriteLine(ex);
}
// in this case server should exchange data with client fine
try {
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c => {
Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
});
tasks[2] = Clinet_ShouldWorkFine();
Task.WhenAll(tasks).Wait();
}
catch (Exception ex) {
Console.WriteLine(ex);
}
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
}
private static async Task Server()
{
using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
{
var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
var buffer = new byte[4];
var bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
var bytes2 = await server.ReadPipeAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
}
}
private static async Task Clinet_ShouldWorkFine()
{
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
{
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
client.Read(buffer, 0, 4);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
Console.WriteLine("client exit");
}
}
private static async Task Clinet_ServerShouldEndCommunication_CuzClientIsSlow()
{
using (var client = new NamedPipeClientStream(".", "test", PipeDirection.InOut, PipeOptions.Asynchronous))
{
var buffer = new byte[4];
client.Connect();
client.Read(buffer, 0, 4);
client.Read(buffer, 0, 4);
await Task.Delay(5000);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
await client.WriteAsync(new byte[] {1, 2, 3, 4}, 0, 4);
Console.WriteLine("client exit");
}
}
}
public static class AsyncPipeFixer {
public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
var async = pipe.BeginRead(buffer, offset, count, null, null);
return new Task<int>(() => {
try { return pipe.EndRead(async); }
finally { registration.Dispose(); }
}, cancellationToken);
}
private static void CancelPipeIo(PipeStream pipe) {
// Note: no PipeStream.IsDisposed, we'll have to swallow
try {
CancelIo(pipe.SafePipeHandle);
}
catch (ObjectDisposedException) { }
}
[DllImport("kernel32.dll")]
private static extern bool CancelIo(SafePipeHandle handle);
}

.NET程序员在编写这样的小测试程序时,会遇到严重的异步/等待问题。它的构图很差,一直都是乌龟。这个程序缺少最后一个乌龟,任务陷入僵局。没有人会像GUI应用程序中通常发生的那样,让任务继续执行。调试起来也异常困难。

首先做一个小改动,使死锁完全可见:

int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

这消除了一个令人讨厌的小角落案例,Server方法一直到"Server exited"消息。Task类的一个长期问题是,当任务完成或等待的方法同步完成时,它将尝试直接运行continuation。这恰好在这个程序中起作用。通过强制它获得异步结果,死锁现在是显而易见的。


下一步是修复Main(),使这些任务不再死锁。可能是这样的:

static void Main(string[] args) {
try {
var tasks = new Task[3];
tasks[0] = Server();
tasks[1] = tasks[0].ContinueWith(c => {
Console.WriteLine($"Server exited, cancelled={c.IsCanceled}");
});
tasks[2] = Clinet();
Task.WhenAll(tasks).Wait();
}
catch (Exception ex) {
Console.WriteLine(ex);
}
Console.WriteLine("press [enter] to exit");
Console.ReadLine();
}

现在我们有机会取得进展,并实际解决取消问题。NamedPipeServerStream类本身不实现ReadAsync,它继承了其基类Stream中的方法。它有一个破旧的小细节,完全没有得到充分的记录。只有当你盯着框架源代码看的时候,你才能看到它。它只能在调用ReadAsync()之前发生取消时检测取消。一旦开始读取,就再也看不到取消。你试图解决的终极问题。

这是一个可以解决的问题,我只是不清楚为什么微软没有为PipeStreams做这件事。强制BeginRead()方法提前完成的正常方法是Dispose()对象,这也是Stream.ReadAsync()中断的唯一方法。但还有另一种方法,在Windows上,可以使用CancelIo()中断I/O操作。让我们把它作为一种扩展方法:

using System;
using System.Threading.Tasks;
using System.Runtime.InteropServices;
using System.IO.Pipes;
using Microsoft.Win32.SafeHandles;
public static class AsyncPipeFixer {
public static Task<int> ReadPipeAsync(this PipeStream pipe, byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
var registration = cancellationToken.Register(() => CancelPipeIo(pipe));
var async = pipe.BeginRead(buffer, offset, count, null, null);
return new Task<int>(() => {
try { return pipe.EndRead(async); }
finally { registration.Dispose(); }
}, cancellationToken);
}
private static void CancelPipeIo(PipeStream pipe) {
// Note: no PipeStream.IsDisposed, we'll have to swallow
try {
CancelIo(pipe.SafePipeHandle);
}
catch (ObjectDisposedException) { }
}
[DllImport("kernel32.dll")]
private static extern bool CancelIo(SafePipeHandle handle);
}

最后调整服务器以使用它:

int bytes = await server.ReadPipeAsync(buffer, 0, 4, cancellationTokenSource.Token);

请注意,此解决方法是特定于Windows的,因此无法在针对Unix风格的.NETCore程序中工作。然后考虑更重的锤子,呼叫管道。CancelPipeIo()方法中的Close()。

Hans Passant的答案很理想。。。几乎唯一的问题是CancelIo()取消了来自同一线程的请求。如果任务在另一个线程上恢复,这将不起作用。不幸的是,我没有足够的声誉点来直接评论他的回答,因此单独回答。

因此,他的示例代码的最后一部分应该重写如下:

private static void CancelPipeIo(PipeStream pipe) {
// Note: no PipeStream.IsDisposed, we'll have to swallow
try {
CancelIoEx(pipe.SafePipeHandle);
}
catch (ObjectDisposedException) { }
}
[DllImport("kernel32.dll")]
private static extern bool CancelIoEx(SafePipeHandle handle, IntPtr _ = default);

请注意,CancelIoEx()在Vista/Server 2008及更高版本中可用,而CancelIo()在Windows XP中也可用。

ReadAsync首先检查取消,然后开始读取,如果令牌被取消,则它不会影响

添加以下行

cancelleToken.Register(server.Disconnect);

using (var cancellationTokenSource = new CancellationTokenSource(1000))
using (var server = new NamedPipeServerStream("test",
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous))
{
var cancellationToken = cancellationTokenSource.Token;
cancellationToken.Register(server.Disconnect);
await server.WaitForConnectionAsync(cancellationToken);
await server.WriteAsync(new byte[]{1,2,3,4}, 0, 4, cancellationToken);
var buffer = new byte[4];
await server.ReadAsync(buffer, 0, 4, cancellationToken);
Console.WriteLine("exit server");
}

我只是在看你的代码,也许会对它有一双新的眼睛…

据我所知,在你最初和后来更复杂的场景中。。。您正在传递一个已经取消的取消令牌,这是非常不可预测的,其他人如何实现(如果有的话)在方法中抛出的异常。。。

使用IsCancellationRequested属性检查令牌是否已取消,并且不要传递已取消的令牌。

这是一个将其从原始问题添加到代码中的示例(您可以对稍后的ReadPipeAsync方法执行同样的操作。

var cancellationToken = cancellationTokenSource.Token;
await server.WaitForConnectionAsync(cancellationToken);
if(!cancellationToken.IsCancellationRequested)
{
await server.WriteAsync(new byte[] { 1, 2, 3, 4 }, 0, 4, cancellationToken);
}
if(!cancellationToken.IsCancellationRequested)
{
var buffer = new byte[4];
await server.ReadAsync(buffer, 0, 4, cancellationToken);
}
Console.WriteLine("exit server");

以上代码将导致

exit server
client exit

我认为这也是你最原始的问题。。。

最新更新