当<T>列表可以附加其他任务时,等待 Task.WhenAny(List) 的适当模式



不可能等待正在更改的List<Task>,因为Task.WhenAny(List<Task>)会获取List<Task>的副本。

什么是合适的模式

List<Task> taskList = new List<Task>();
await Task.WhenAny(taskList);

当任务列表可以在调用第一个 WhenAny 后向其添加其他任务时?

下面演示了该问题的完整演示代码。

static readonly List<Task<int>> taskList = new List<Task<int>>();
static readonly Random rnd = new Random(1);
static async Task<int> RunTaskAsync(int taskID,int taskDuration)
{
await Task.Yield();
Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
await Task.Delay(taskDuration);  // mimic some work
return taskID;
}
static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
{
// Add numTasks asyncronously to the taskList
// First task is added Syncronously and then we yield the adds to a worker
taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task
// remaing task run's are Yielded to a worker thread
for (int i = 2; i <= numTasks; i++)
{
await Task.Delay(rnd.Next(minDelay, maxDelay));
taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
}
}
static async Task Main(string[] args)
{
Stopwatch sw = new Stopwatch(); sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
// while there are tasks to complete use the main thread to process them as they comeplete
while(taskList.Count > 0)
{
var t = await Task.WhenAny(taskList);
taskList.Remove(t);
var i = await t;
Console.WriteLine("Task {0} found to be completed at: {1}",i,sw.Elapsed);
}
// All tasks have completed sucessfully - exit main thread
}

控制台输出,显示 WhenAny(( 循环仅在找到并删除 60 秒任务后才找到所有其他已完成的任务。

Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Starting Task: 3 with a duration of 24 seconds
Starting Task: 4 with a duration of 15 seconds
Starting Task: 5 with a duration of 28 seconds
Starting Task: 6 with a duration of 21 seconds
Starting Task: 7 with a duration of 11 seconds
Starting Task: 8 with a duration of 29 seconds
Starting Task: 9 with a duration of 21 seconds
Starting Task: 10 with a duration of 20 seconds
Task 1 found to be completed at: 00:01:00.1305811
Task 2 found to be completed at: 00:01:00.1312951
Task 3 found to be completed at: 00:01:00.1315689
Task 4 found to be completed at: 00:01:00.1317623
Task 5 found to be completed at: 00:01:00.1319427
Task 6 found to be completed at: 00:01:00.1321225
Task 7 found to be completed at: 00:01:00.1323002
Task 8 found to be completed at: 00:01:00.1324379
Task 9 found to be completed at: 00:01:00.1325962
Task 10 found to be completed at: 00:01:00.1327377

谢谢!

您展示的代码存在问题,即它在工作人员和任务创建者之间没有合理的通信管道。您需要某种消息传递机制来通知工作人员有关新任务的信息(以及何时没有更多任务(,以便它可以对其进行反应。这是你必须为你的并发系统弄清楚的事情,确切的实现与问题无关,所以我只是假设我们的worker中有OnTaskAdded(Task task)OnEnd()的方法。

从你所说的,你不想真正等到任何任务完成,而是为每个任务在完成时执行一些东西。请参阅下面的更新答案。这可以通过ContinueWith来实现:

class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();
// Start the stopwatch in the constructor or in some kind of a StartProcessing method.
void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = task.ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, _stopwatch.Elapsed));
_tasks.Add(taskWithContinuation);
}
async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}
}

编辑: 在关于确保合理的消息传递管道的所有道德化讨论之后,我认为我实际上可以为您提供一个快速而肮脏的实现,以便您可以看到它的工作原理:

// DISCLAIMER: NOT PRODUCTION CODE!!!
public static async Task Main()
{
Stopwatch sw = new Stopwatch(); sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();
// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
var taskWithContinuation = taskList[i].ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, sw.Elapsed));
internalList.Add(taskWithContinuation);
++i;
}
await Task.WhenAll(internalList);
}

让我再次强调:这不是生产质量的代码!积极等待更多任务,呃。它的输出是这样的:

Task 1 intercepted at: 00:00:00.0495570
Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8459622
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2626124
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2257285
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3058738
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6376981
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7507146
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7107754
Task 2 found to be completed at: 00:00:12.8111589
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7883430
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.6707959
Task 7 found to be completed at: 00:00:21.6692276
Task 4 found to be completed at: 00:00:24.2125638
Task 3 found to be completed at: 00:00:31.2276640
Task 6 found to be completed at: 00:00:31.5908324
Task 10 found to be completed at: 00:00:34.5585143
Task 9 found to be completed at: 00:00:34.7053864
Task 5 found to be completed at: 00:00:38.2616534
Task 8 found to be completed at: 00:00:40.6372696
Task 1 found to be completed at: 00:01:00.0720695

您可以看到,由于多线程工作的性质,行有点混乱,但时间戳是准确的。

更新:

好吧,我很笨,我刚刚邀请你进入一个反模式。使用ContinueWith是危险的,而且它过于复杂 - 引入async/await是为了让我们免于手动安排延续。您可以使用await并记录时间的操作来包装Task<int>

class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();
// Start the stopwatch in the constructor or in some kind of a StartProcessing method.
void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = ContinueWithLog(task);
_tasks.Add(taskWithContinuation);
}
async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}
private Task ContinueWithLog(Task<int> task)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}
}

使用示例代码进行快速而脏的 PoC:

class Program
{
static readonly List<Task<int>> taskList = new List<Task<int>>();
static readonly Random rnd = new Random(1);
static readonly Stopwatch sw = new Stopwatch();
static async Task<int> RunTaskAsync(int taskID, int taskDuration)
{
await Task.Yield();
Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
await Task.Delay(taskDuration);  // mimic some work
return taskID;
}
static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
{
// Add numTasks asyncronously to the taskList
// First task is added Syncronously and then we yield the adds to a worker
taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task
// remaing task run's are Yielded to a worker thread
for (int i = 2; i <= numTasks; i++)
{
await Task.Delay(rnd.Next(minDelay, maxDelay));
taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
}
}
public static async Task ContinueWithLog(Task<int> source)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}
public static async Task Main()
{
sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();
// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
internalList.Add(ContinueWithLog(taskList[i]));
++i;
}
await Task.WhenAll(internalList);
}
}

输出:

Starting Task: 1 with a duration of 60 seconds
Task 1 intercepted at: 00:00:00.0525006
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8551382
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2687049
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2404507
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3325019
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6654663
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7809841
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7576237
Task 2 found to be completed at: 00:00:12.8151955
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7228579
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.5829039
Task 7 found to be completed at: 00:00:21.6848699
Task 4 found to be completed at: 00:00:24.2089671
Task 3 found to be completed at: 00:00:31.2300136
Task 6 found to be completed at: 00:00:31.5847257
Task 10 found to be completed at: 00:00:34.5550722
Task 9 found to be completed at: 00:00:34.6904076
Task 5 found to be completed at: 00:00:38.2835777
Task 8 found to be completed at: 00:00:40.6445029
Task 1 found to be completed at: 00:01:00.0826952

这是实现您想要的惯用方法。很抱歉先用ContinueWith误导你,这是不必要的且容易出错的,现在我们都知道了。

List<Task>

不是此类作业的合适容器,因为它不支持完成的概念。因此,您将无法确定列表中是否有更多任务要添加,以便您可以停止等待。不过有多种选择。

  1. BlockingCollection<Task>.生产者调用这些方法Add,最后CompleteAdding,以表示已完成添加任务。消费者只是枚举GetConsumingEnumerable。非常简单,但本质上是阻塞的(不是异步的(。
  2. BufferBlock<Task>.生产者调用方法SendAsync,最后Complete,以表示已完成添加任务。使用者使用OutputAvailableAsyncTryReceive方法异步枚举。需要包 TPL 数据流(对于 .NET Framework,它包含在 .NET Core 中(。
  3. Channel<Task>.生成者调用这些方法Writer.WriteAsync,最后Writer.Complete表示已完成添加任务。使用者使用Reader.WaitToReadAsyncReader.TryRead方法异步枚举。需要包 System.Threading.Channels(对于 .NET Framework,它包含在 .NET Core 中(。
  4. IObservable<Task>+IObserver<Task>对。观察者订阅可观察量,然后开始接收有关新任务的通知。最后一个通知是onCompleted(),表示不会再生成通知。反应式扩展库包括一系列用于操作可观察量的方法,其中之一是Merge运算符,可用于等待所有任务,利用Task<T>可以转换为生成单个onNext通知的IObservable<T>的事实。这种方法可能看起来很古怪,它可能不值得投资学习这项技术(响应式编程范式(,除非你经常处理你想要过滤、转换、组合等的传入数据流。

更新:回想起来,前三个选项不能按原样使用,因为您还想等待任务。所以我现在的建议是使用TransformBlock<Task, Task>而不是BufferBlock<Task>.

var block = new TransformBlock<Task, Task>(async task =>
{
try
{
await task;
}
catch { } // suppress exceptions
return task;
});

向块添加任务的生产者示例:

var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
await Task.Delay(100);
Console.WriteLine($"Sending {i}");
await block.SendAsync(Task.Delay(i * 100));
}
block.Complete();
});

从块接收已完成任务的使用者示例:

var consumer = Task.Run(async () =>
{
while (await block.OutputAvailableAsync())
{
while (block.TryReceive(out var task))
{
Console.WriteLine($"Task Completed: {task.Status}");
}
}
});

任务的接收顺序与添加到块中的顺序相同。如果您希望在完成后立即接收它们,请按如下方式配置块:

new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = Int32.MaxValue,
EnsureOrdered = false
}

最新更新