我有一个方法,它采用IWorkItem
,开始工作并返回相关任务。由于使用了外部库,该方法必须看起来像这样。
public Task WorkOn(IWorkItem workItem)
{
//...start asynchronous operation, return task
}
我想在多个工作项上做这项工作。我不知道有多少人会去,也许有1个,也许1万。WorkOn
方法具有内部池,如果达到太多并行执行,可能需要等待。(如SemaphoreSlim.Wait
):
public Task WorkOn(IWorkItem workItem)
{
_semaphoreSlim.Wait();
}
我当前的解决方案是:
public void Do(params IWorkItem[] workItems)
{
var tasks = new Task[workItems.Length];
for (var i = 0; i < workItems.Length; i++)
{
tasks[i] = WorkOn(workItems[i]);
}
Task.WaitAll(tasks);
}
问题:我可以用some Parallel吗?在这种情况下,每个人?为了避免创建10000个任务,然后由于WorkOn
的节流而等待?
其实没那么容易。您可以使用Parallel.ForEach
来限制生成的任务数量。但我不确定它在你的情况下会有什么表现。
作为一般的经验法则,我通常尽量避免将Task
和Parallel
混合使用。
你当然可以这样做:
public void Do(params IWorkItem[] workItems)
{
Parallel.ForEach(workItems, (workItem) => WorkOn(workItem).Wait());
}
在"正常"条件下,这应该可以很好地限制并发性。
您也可以完全使用async
- await
,并使用一些技巧对您的并发性进行一些限制。但是在这种情况下,你必须自己限制并发性。
const int ConcurrencyLimit = 8;
public async Task Do(params IWorkItem[] workItems)
{
var cursor = 0;
var currentlyProcessing = new List<Task>(ConcurrencyLimit);
while (cursor < workItems.Length)
{
while (currentlyProcessing.Count < ConcurrencyLimit && cursor < workItems.Length)
{
currentlyProcessing.Add(WorkOn(workItems[cursor]));
cursor++;
}
Task finished = await Task.WhenAny(currentlyProcessing);
currentlyProcessing.Remove(finished);
}
await Task.WhenAll(currentlyProcessing);
}
正如我所说的…要复杂得多。但它也会将并发性限制为您所应用的任何值。此外,它正确地使用了async
- await
模式。如果你不想要非阻塞多线程,你可以很容易地将这个函数包装到另一个函数中,并对这个函数返回的任务执行阻塞.Wait
。
这个实现的关键是Task.WhenAny
函数。这个函数将在应用的任务列表中返回一个已完成的任务(由await
的另一个任务包装)。