在单线程C#控制台应用程序中,我可以安全地使用async void异步处理队列中的多个项目吗



我和我的团队支持几个同步处理队列消息的后台工作控制台应用程序。

目前,当队列中的消息数量超过特定阈值时,我们使用docker来处理多条消息,从而启动应用程序的新实例。

我们正在讨论并行处理这些消息的替代方法,一位队友建议在while循环中使用顶级的async void方法来处理这些消息。

它看起来会起作用,因为它类似于使用async void作为事件处理程序的基于UI的应用程序。

然而,我个人从未写过一个后台工作人员以这种方式并行处理多条消息,我想知道是否有人有这样做的经验,如果有,是否有任何";gotchas";我们没有想到的。

以下是建议解决方案的简化版本:

static async void Main(string[] args)
{
while (true)
{
TopLevelHandler(await ReceiveMessage());
}
}
static async void TopLevelHandler(object message)
{
await DoAsyncWork(message);
}
static async Task<object> ReceiveMessage()
{
//fetch message from queue
return new object();
}
static async Task DoAsyncWork(object message)
{
//processing here
}

我能安全地使用async void吗。。。

这取决于你的意思是"安全地";。async void方法有特定的行为,所以如果你喜欢这些行为,那么async void也可以

  1. async void操作无法等待,因此您不知道它们何时完成。如果您要求在终止程序之前等待所有挂起的异步操作完成,那么async void对您不利
  2. async void操作中引发的未处理异常将在async void方法启动时捕获的SynchronizationContext上重新引发。由于您有一个Console应用程序,因此没有SynchronizationContext,因此将在ThreadPool上引发错误。这意味着您的应用程序将引发AppDomain.UnhandledException事件,然后崩溃。如果您要求应用程序不在随机时刻崩溃,那么async void对您不利

async void方法最初是为了使async事件处理程序成为可能而发明的。在现代实践中,这仍然是它们的主要用法。

使async void成为一个有趣选择的一个利基场景是,您有一个工作流,当它完成时,它必须启动另一个工作流。如果踢腿对应用程序的持续运行至关重要,而不踢腿会使应用程序处于挂起状态,那么将此故障升级为进程崩溃事件是有意义的。可以说,崩溃的程序比挂起的程序要好。您可以在此处和此处看到针对该场景使用async void的两个示例。在第二个例子中,async void是传递给ThreadPool.QueueUserWorkItem方法的lambda,确保async void不会捕获任何未知的环境SynchronizationContext

对于最新的NET,默认情况下我们有async Main方法。

此外,如果您没有该功能,可以将Main标记为异步。

这里的主要风险是,你可能会错过例外情况,并且必须非常小心地处理创建的任务。如果您将在while (true)循环中创建任务,那么很可能在某个时刻(当有人错误地使用某个阻塞调用时(出现线程池饥饿。

下面的示例代码显示了shuold的想法,但我最确信的是,还会有更多的复杂性:

using System.Collections.Concurrent;
using System.Security.Cryptography.X509Certificates;
namespace ConsoleApp2;
public static class Program
{
/// <summary>
/// Property to track all running tasks, should be dealt with carefully.
/// </summary>
private static ConcurrentBag<Task> _runningTasks = new();
static Program()
{
// Subscribe to an event.
TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
}
static async Task Main(string[] args)
{
var messageNo = 1;
while (true)
{
// Just schedule the work
// Here you should most probably limit number of
// tasks created.
var task = ReceiveMessage(messageNo)
.ContinueWith(t => TopLevelHandler(t.Result));
_runningTasks.Add(task);
messageNo++;
}
}
static async Task TopLevelHandler(object message)
{
await DoAsyncWork(message);
}
static async Task<object> ReceiveMessage(int messageNumber)
{
//fetch message from queue
await Task.Delay(5000);
return await Task.FromResult(messageNumber);
}
static async Task DoAsyncWork(object message)
{
//processing here
Console.WriteLine($"start processing message {message}");
await Task.Delay(5000);

// Only when you want to test catching exception
// throw new Exception("some malicious code");
Console.WriteLine($"end processing message {message}");
}
/// <summary>
/// Here you handle any unosberved exception thrown in a task.
/// Preferably you should handle somehow all running work in other tasks.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
static async void TaskScheduler_UnobservedTaskException(object sender, UnobservedTaskExceptionEventArgs e)
{
// Potentially this method could be enter by multiple tasks having exception.
Console.WriteLine($"Exception caught: {e.Exception.InnerException.Message}");
await Task.WhenAll(_runningTasks.Where(x => !x.IsCompleted));
}
}

尝试使用带有Timer的IHostedService来调用后台工作。请使用此链接https://learn.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-6.0&tabs=visualstudio了解更多详细信息。

最新更新