如何将异步IO任务的数量限制到数据库?



我有一个id's列表,我想从数据库中并行获取每个id的数据。我的以下ExecuteAsync方法以非常高的吞吐量调用,对于每个请求,我们都有大约500 ids我需要提取数据的请求。

所以我在下面的代码中循环ids列表并并行地对每个id进行异步调用,并且工作正常。

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
{
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
var excludeNull = new List<T>(ids.Count);
for (int i = 0; i < responses.Length; i++)
{
var response = responses[i];
if (response != null)
{
excludeNull.Add(response);
}
}
return excludeNull;
}
private async Task<T> Execute<T>(IPollyPolicy policy,
Func<CancellationToken, Task<T>> requestExecuter) where T : class
{
var response = await policy.Policy.ExecuteAndCaptureAsync(
ct => requestExecuter(ct), CancellationToken.None);
if (response.Outcome == OutcomeType.Failure)
{
if (response.FinalException != null)
{
// log error
throw response.FinalException;
}
}
return response?.Result;
}

问题:

现在如您所见,我正在循环所有ids,并为每个id并行对数据库进行一堆异步调用,这可能会给数据库带来大量负载(取决于有多少请求即将到来(。所以我想限制我们对数据库进行的异步调用的数量。我修改了ExecuteAsync以使用如下所示的Semaphore,但它看起来不像是我想要它做的事情:

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
finally
{
throttler.Release();
}
}
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
}

信号量适用于Threads还是Tasks?在这里阅读它看起来像信号量是用于线程的,而信号量Slim是用于任务的。

这是对的吗?如果是,那么解决此问题并限制我们在此处对数据库进行的异步 IO 任务数量的最佳方法是什么。

任务是线程上的抽象,不一定会创建新线程。信号量限制可以访问该 for 循环的线程数。执行返回一个不是线程的任务。如果只有 1 个请求,则 for 循环中将只有 1 个线程,即使它要求 500 个 id。1 线程本身发送所有异步 IO 任务。

有点。我不会说任务与线程完全相关。实际上有两种任务:委托任务(这是一种线程的抽象(和 promise 任务(与线程无关(。

关于SemaphoreSlim,它确实限制了代码块(不是线程(的并发性。

我最近开始使用 C#,所以我的理解不正确,看起来像 w.r.t 线程和任务。

我建议阅读我的async介绍和最佳实践。如果您更有兴趣了解线程如何不真正参与,请跟进没有线程。

我修改了 ExecuteAsync 以使用信号量,如下所示,但它看起来不像是我想要它做的事情

当前代码仅限制将任务添加到列表中,无论如何,一次只能执行一个任务。您要做的是限制执行本身:

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy, Func<CancellationToken, int, Task<T>> mapper) where T : class
{
var throttler = new SemaphoreSlim(250);
var tasks = new List<Task<T>>(ids.Count);
// invoking multiple id in parallel to get data for each id from database
for (int i = 0; i < ids.Count; i++)
tasks.Add(ThrottledExecute(ids[i]));
// wait for all id response to come back
var responses = await Task.WhenAll(tasks);
// same excludeNull code check here
return excludeNull;
async Task<T> ThrottledExecute(int id)
{
await throttler.WaitAsync().ConfigureAwait(false);
try {
return await Execute(policy, ct => mapper(ct, id)).ConfigureAwait(false);
} finally {
throttler.Release();
}
}
}

您的同事可能想到了Semaphore类,它确实是一个以线程为中心的限制器,没有异步功能。

限制可以并发访问资源或资源池的线程数。

SemaphoreSlim类是Semaphore的轻量级替代,它包括异步方法WaitAsync,它使世界变得不同。WaitAsync不会阻止线程,而是阻止异步工作流。异步工作流很便宜(通常每个小于 1000 字节(。您可以在任何给定时刻同时"运行"数百万个它们。线程的情况并非如此,因为每个线程为其堆栈保留了 1 MB 的内存。

至于ExecuteAsync方法,以下是如何使用 LINQ 方法重构它的方法SelectWhereToArrayToList


更新:Polly 库支持捕获并继续当前同步上下文,因此我添加了一个bool executeOnCurrentContext参数。我还将异步Execute方法重命名为ExecuteAsync,以符合准则。

private async Task<List<T>> ExecuteAsync<T>(IList<int> ids, IPollyPolicy policy,
Func<CancellationToken, int, Task<T>> mapper,
int concurrencyLevel = 1, bool executeOnCurrentContext = false) where T : class
{
var throttler = new SemaphoreSlim(concurrencyLevel);
Task<T>[] tasks = ids.Select(async id =>
{
await throttler.WaitAsync().ConfigureAwait(executeOnCurrentContext);
try
{
return await ExecuteAsync(policy, ct => mapper(ct, id),
executeOnCurrentContext).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
}).ToArray();
T[] results = await Task.WhenAll(tasks).ConfigureAwait(false);
return results.Where(r => r != null).ToList();
}
private async Task<T> ExecuteAsync<T>(IPollyPolicy policy,
Func<CancellationToken, Task<T>> function,
bool executeOnCurrentContext = false) where T : class
{
var response = await policy.Policy.ExecuteAndCaptureAsync(
ct => executeOnCurrentContext ? function(ct) : Task.Run(() => function(ct)),
CancellationToken.None, continueOnCapturedContext: executeOnCurrentContext)
.ConfigureAwait(executeOnCurrentContext);
if (response.Outcome == OutcomeType.Failure)
{
if (response.FinalException != null)
{
ExceptionDispatchInfo.Throw(response.FinalException);
}
}
return response?.Result;
}

您正在限制向列表中添加任务的速度。您不会限制任务的执行速率。为此,您可能必须在Execute方法本身中实现信号量调用。

如果您无法修改Execute,另一种方法是轮询已完成的任务,如下所示:

for (int i = 0; i < ids.Count; i++)
{
var pendingCount = tasks.Count( t => !t.IsCompleted );
while (pendingCount >= 500) await Task.Yield();
tasks.Add(Execute(policy, ct => mapper(ct, ids[i])));
}
await Task.WhenAll( tasks );

实际上,TPL 能够控制任务执行并限制并发性。您可以测试有多少并行任务适合您的用例。无需考虑线程,TPL 将为您管理一切。

要使用有限并发性,请参阅此答案,归功于 @panagiotis-kanavos

.Net TPL:具有任务优先级的有限并发级别任务调度程序?

示例代码是(即使使用不同的优先级,您也可以去除它(:

QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default,4);
TaskScheduler pri0 = qts.ActivateNewQueue(priority: 0);
TaskScheduler pri1 = qts.ActivateNewQueue(priority: 1);
Task.Factory.StartNew(()=>{ }, 
CancellationToken.None, 
TaskCreationOptions.None, 
pri0);

只需将所有任务扔到队列中,有了Task.WhenAll,您就可以等到一切都完成。

最新更新