ChannelReader<T>.ReadAllAsync
在被CancellationToken取消时抛出任何异常吗?它似乎没有抛出OperationCanceledException/TaskCanceledException?
我知道如果这两个方法以一种即发即弃的方式调用,即_ = SendLoopAsync(); _ = ReceiveLoopAsync();
,它会使任务崩溃而没有显示消息/异常,因为它们没有等待,这意味着我们正在丢失异常。
我不希望它在没有让我知道它实际上已经崩溃/被取消的情况下崩溃,这意味着我应该在try/catch中包装整个SendLoopAsync,而不是在ReadAllAsync的分支之间。
如果能举一个小例子来说明它的行为,那就太好了。
var clientWebSocket = new ClientWebSocket();
await clientWebSocket.ConnectAsync(new Uri("wss://www.deribit.com/ws/api/v2"), CancellationToken.None).ConfigureAwait(false);
var client = new ChannelWebSocket(clientWebSocket);
for (var i = 1; i <= 10; i++)
{
client.Output.TryWrite($"Item: {i}");
}
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(1));
await client.StartAsync(cts.Token).ConfigureAwait(false); // blocks the UI
Console.ReadLine();
public class ChannelExample
{
private readonly WebSocket _webSocket;
private readonly Channel<string> _input;
private readonly Channel<string> _output;
public ChannelExample(WebSocket webSocket)
{
_webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
_input = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleWriter = true
});
_output = Channel.CreateUnbounded<string>(new UnboundedChannelOptions
{
SingleReader = true
});
}
public ChannelReader<string> Input => _input.Reader;
public ChannelWriter<string> Output => _output.Writer;
public async Task StartAsync(CancellationToken cancellationToken)
{
var receiving = ReceiveLoopAsync(cancellationToken);
var sending = SendLoopAsync(cancellationToken);
var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
if (completedTask.Exception != null)
{
Console.WriteLine("Exception");
}
}
private async Task SendLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine($"Sending: {message}");
await Task.Delay(5000, cancellationToken).ConfigureAwait(false);
}
}
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
{
using var buffer = MemoryPool<byte>.Shared.Rent();
while (_webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested)
{
ValueWebSocketReceiveResult receiveResult;
do
{
receiveResult = await _webSocket.ReceiveAsync(buffer.Memory, cancellationToken).ConfigureAwait(false);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
return;
}
} while (!receiveResult.EndOfMessage);
}
}
}
我怀疑它会抛出;当然,您总是可以对此进行测试,但是—这是本场景中一般期望的模式。所以你可以用:
try
{
// ...
}
catch (OperationCancelledException) when (cancellationToken.IsCancellationRequested)
{
// treat as completion; swallow
}
或者:您可以将CancellationToken.None
传递到通道读取API,并仅使用写入器的完成来表示退出(确保退出时在写入器上调用.Complete(...)
)。
ReadAllAsync
可能不是这里首选的API,因为您并不真正需要它作为IAsyncEnumerable<T>
-所以最好使用本机通道API,即
while (await _output.Reader.WaitToReadAsync(cancellationToken))
{
while (_output.Reader.TryRead(out var message))
{
// ...
}
}
我不确定StartAsync
返回的Task
代表什么:
public async Task StartAsync(CancellationToken cancellationToken)
{
var receiving = ReceiveLoopAsync(cancellationToken);
var sending = SendLoopAsync(cancellationToken);
var completedTask = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
if (completedTask.Exception != null)
{
Console.WriteLine("Exception");
}
}
它似乎代表receiving
和sending
任务中的任何一个任务的完成,这很奇怪。这很可能是尝试记录任务异常的意外结果。有比这更好的方法来记录任务异常,最简单的方法是将异步方法中的所有代码封装在try
/catch
块中。除此之外,Task
的Exception
属性仅在任务IsFaulted
时不为空,而在任务IsCanceled
时不为空。