立即捕获异常不适用于Task.WhenAll



我有两个类的实例,它创建了一个UDP套接字来从UDP客户端接收数据。如果其中一个实例抛出异常,我希望立即在更高层处理它。在我的程序中,它们是从await Task.WhenAll(recv1.StartAsync(), recv2.StartAsync)开始的。但是,这将等待所有任务在引发第一个异常之前完成。关于如何解决这个问题有什么想法吗?

static async Task Main(string[] args)
{
var udpReceiver1 = new UdpReceiver(localEndpoint1);
var udpReceiver2 = new UdpReceiver(localEndpoint2);
var cts = new CancellationTokenSource();
try
{
await Task.WhenAll(udpReceiver1.StartAsync(cts.Token), udpReceiver2.StartAsync(cts.Token));
}
catch (Exception e)
{
// Handle Exception...
cts.Cancel();
}
}
class UdpReceiver
{
public UdpReceiver(IPEndPoint endpoint)
{
udpClient = new UdpClient(endpoint);
}
public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
var result = await ReceiveAsync(cancellationToken);
var message = Encoding.UTF8.GetString(result.Buffer);
Trace.WriteLine($"UdpClient1 received message:{Encoding.UTF8.GetString(result.Buffer)}");

// throw new Exception("UdpClient1 raising exception");
}
}
}
private async Task<UdpReceiveResult> ReceiveAsync(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<UdpReceiveResult>();
using (cancellationToken.Register(() => tcs.TrySetCanceled(), false))
{
var task = udpClient.ReceiveAsync();
var completedTask = await Task.WhenAny(task, tcs.Task);
var result = await completedTask.ConfigureAwait(false);
return result;
}
}
private UdpClient udpClient;
}

更新1:任务。WhatAny将是一个可行的解决方案。感谢@CamiloTerevinto

try
{
await await Task.WhenAny(udpReceiver1.StartAsync(cts.Token), udpReceiver2.StartAsync(cts.Token));
}
catch (Exception e)
{
// Handle Exception...
cts.Cancel();
}

更新2:为了对所有任务进行更细粒度的异常处理,我将采用@Servy提出的Task.WhenAll的自适应实现。

该行为与框架WhenAll实现有很大不同,因此您最好只编写自己的改编版本,幸运的是,实现起来并不特别困难。只需为每个任务附加一个延续,如果任何任务被取消或出现故障,则生成的任务也会执行相同的操作,如果成功,则存储结果,如果最后一个任务成功,则使用所有存储的结果完成任务。

public static Task<IEnumerable<TResult>> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks)
{
var listOfTasks = tasks.ToList();
if (listOfTasks.Count == 0)
{
return Task.FromResult(Enumerable.Empty<TResult>());
}
var tcs = new TaskCompletionSource<IEnumerable<TResult>>();
var results = new TResult[listOfTasks.Count];
int completedTasks = 0;
for (int i = 0; i < listOfTasks.Count; i++)
{
int taskIndex = i;
Task<TResult> task = listOfTasks[i];
task.ContinueWith(_ =>
{
if (task.IsCanceled)
tcs.TrySetCanceled();
else if (task.IsFaulted)
tcs.TrySetException(task.Exception.InnerExceptions);
else
{
results[taskIndex] = task.Result;
if (Interlocked.Increment(ref completedTasks) == listOfTasks.Count)
{
tcs.TrySetResult(results);
}
}
});
}
return tcs.Task;
}

与许多基于任务的通用操作一样,你也需要一个没有结果的版本,如果你不想处理显著的开销,你真的只需要复制粘贴基于结果的方法,但所有的结果都被撕掉了,这并不难,只是不雅。将所有这些任务转化为任务也会起作用,但对于这样的操作,开销可能会有问题。

public static Task WhenAll(IEnumerable<Task> tasks)
{
var listOfTasks = tasks.ToList();
if (listOfTasks.Count == 0)
{
return Task.CompletedTask;
}
var tcs = new TaskCompletionSource<bool>();
int completedTasks = 0;
for (int i = 0; i < listOfTasks.Count; i++)
{
int taskIndex = i;
Task task = listOfTasks[i];
task.ContinueWith(_ =>
{
if (task.IsCanceled)
tcs.TrySetCanceled();
else if (task.IsFaulted)
tcs.TrySetException(task.Exception.InnerExceptions);
else
{
if (Interlocked.Increment(ref completedTasks) == listOfTasks.Count)
{
tcs.TrySetResult(true);
}
}
});
}
return tcs.Task;
}

可能有一种方法可以做到这一点,但我想不出一种不让代码变得非常混乱的方法。最好在实际任务中处理异常。如果需要使用公共代码处理它,请使用处理程序委托。

static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
//This is our common error handler
void HandleException(Exception ex)
{
Log("Exception!" + ex.Message);
cts.Cancel();
}
var udpReceiver1 = new UdpReceiver(localEndpoint1);
var udpReceiver2 = new UdpReceiver(localEndpoint1);
//We pass the handler as one of the arguments
await Task.WhenAll(udpReceiver1.StartAsync(cts.Token, HandleException), udpReceiver2.StartAsync(cts.Token, HandleException));
}
class UdpReceiver
{
public async Task StartAsync(CancellationToken cancellationToken, Action<Exception> errorHandler)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
//Main logic goes here
}
}
catch(Exception ex)
{
errorHandler(ex);  //Call common error handling code
}
}

您可以分两步等待任务。在第一步中,等待任何完成,如果失败,则启动取消。不要在此步骤中处理异常。等待所有任务完成后,将异常处理延迟到第二步。这两个任务可能都失败了,所以您可能需要分别处理每个任务的异常。

Task task1 = udpReceiver1.StartAsync(cts.Token);
Task task2 = udpReceiver2.StartAsync(cts.Token);
// Await any task to complete
Task firstCompletedTask = await Task.WhenAny(task1, task2);
if (firstCompletedTask.IsFaulted) cts.Cancel();
try
{
// Await them all to complete
await Task.WhenAll(task1, task2);
}
catch
{
if (task1.IsFaulted) HandleException(task1.Exception.InnerException);
if (task2.IsFaulted) HandleException(task2.Exception.InnerException);
}

最新更新