我刚刚开始学习c#线程和并发集合,我不确定用什么术语来表达我的问题,所以我将简要描述一下我正在尝试做的事情。在这一点上,我对这门学科的掌握至多是初步的。我下面的方法是否如我所设想的那样可行?
-
我在一个必须测试的并发集合中有100,000个url -链接是否仍然良好?我有另一个并发集合,最初为空,它将包含异步请求确定已移动的url子集(400,404等错误)。
-
我想在我的PC和我们的带宽允许的情况下并发地产生尽可能多的这些异步请求,并且打算从每秒20个async-web-request-tasks开始,并从那里开始工作。
如果一个异步任务处理这两件事,它会工作吗?如果遇到4xx错误,它会发出异步请求,然后将url添加到BadUrls集合?该任务的一个新实例将每50ms产生一次:
class TestArgs args {
ConcurrentBag<UrlInfo> myCollection { get; set; }
System.Uri currentUrl { get; set; }
}
ConcurrentQueue<UrlInfo> Urls = new ConncurrentQueue<UrlInfo>();
// populate the Urls queue
<snip>
// initialize the bad urls collection
ConcurrentBag<UrlInfo> BadUrls = new ConcurrentBag<UrlInfo>();
// timer fires every 50ms, whereupon a new args object is created
// and the timer callback spawns a new task; an autoEvent would
// reset the timer and dispose of it when the queue was empty
void SpawnNewUrlTask(){
// if queue is empty then reset the timer
// otherwise:
TestArgs args = {
myCollection = BadUrls,
currentUrl = getNextUrl() // take an item from the queue
};
Task.Factory.StartNew( asyncWebRequestAndConcurrentCollectionUpdater, args);
}
public async Task asyncWebRequestAndConcurrentCollectionUpdater(TestArgs args)
{
//make the async web request
// add the url to the bad collection if appropriate.
}
可行吗?路要走?
这种方法看起来不错,但是您所展示的特定代码存在一些问题。
但在我开始之前,有人在评论中建议任务并行是可行的。我认为这是一种误导。有一个常见的误解是,如果您希望并行进行大量工作,就必须需要大量线程。只有当工作与计算机有关时,这才是正确的。但是您所做的工作将是受IO约束的——这段代码将花费大部分时间等待响应。它只需要很少的计算。因此,在实践中,即使它只使用一个线程,您的初始目标每秒20个请求似乎不会导致单个CPU核心出汗的工作负载。
简而言之,单个线程可以处理非常高级别的并发IO。只有在需要并行执行代码时才需要多个线程,而这里看起来不太可能是这种情况,因为在这个特定的任务中CPU的工作很少。
(这个误解比await
和async
早几年。事实上,它早于TPL——请参阅http://www.interact-sw.co.uk/iangblog/2004/09/23/threadless获取。net 1.1时代的示例,说明如何使用少量线程处理数千个并发请求。基本原理今天仍然适用,因为Windows网络IO基本上仍然以相同的方式工作。
并不是说在这里使用多个线程有什么特别的错误,我只是指出它有点分散注意力。
无论如何,回到你的代码。这一行有问题:Task.Factory.StartNew( asyncWebRequestAndConcurrentCollectionUpdater, args);
虽然你没有给我们你所有的代码,我看不出这将如何能够编译。接受两个参数的StartNew
的过载要求第一个参数是Action
、Action<object>
、Func<TResult>
或Func<object,TResult>
。换句话说,它必须是一个方法,要么不接受参数,要么接受object
类型的单个参数(可能返回值,也可能不返回值)。你的'asyncWebRequestAndConcurrentCollectionUpdater'接受一个类型为TestArgs
的参数。
但是它不能编译并不是主要问题。这很容易解决。(例如,将其更改为Task.Factory.StartNew(() => asyncWebRequestAndConcurrentCollectionUpdater(args));
)真正的问题是你正在做的事情有点奇怪:你正在使用Task.StartNew
调用一个已经返回Task
的方法。
Task.StartNew
是采用同步方法(即不返回Task
的方法)并以非阻塞方式运行它的方便方法。(它将在线程池上运行。)但是如果你有一个已经返回Task
的方法,那么你真的不需要使用Task.StartNew
。如果我们看看Task.StartNew
返回的内容(一旦修复了编译错误),这种奇怪就变得更加明显了:
Task<Task> t = Task.Factory.StartNew(
() => asyncWebRequestAndConcurrentCollectionUpdater(args));
Task<Task>
揭示了正在发生的事情。您决定用一种通常用于使非异步方法异步的机制包装一个已经异步的方法。所以你现在有一个Task
生成一个Task
。
StartNew
返回的任务完成,那么底层的工作不一定会完成:
t.Wait(); // doesn't wait for asyncWebRequestAndConcurrentCollectionUpdater to finish!
实际上所做的就是等待asyncWebRequestAndConcurrentCollectionUpdater
返回Task
。由于asyncWebRequestAndConcurrentCollectionUpdater
已经是一个异步方法,它将或多或少立即返回一个任务。(具体来说,它将在执行await
时返回一个没有立即完成的任务。)
如果你想等待你已经开始的工作完成,你需要这样做:
t.Result.Wait();
或者,可能更有效地,这样:
t.Unwrap().Wait();
那说:让我的Task
,我的异步方法返回,然后等待。这可能与下面这个简单得多的代码没有什么不同:
Task t = asyncWebRequestAndConcurrentCollectionUpdater("foo");
... maybe queue up some other tasks ...
t.Wait();
通过引入' Task.Factory.StartNew',您可能没有获得任何有用的信息。
我说"可能"是因为有一个重要的限定条件:这取决于你开始工作的环境。默认情况下,c#生成的代码会尝试确保当async
方法在await
之后继续执行时,它是在await
最初执行的相同上下文中执行的。例如,如果你在WPF应用程序中,并且在UI线程上执行await
,当代码继续执行时,它将在UI线程上安排这样做。(您可以使用ConfigureAwait
禁用此功能。)
因此,如果你处于上下文本质上是序列化的情况下(要么因为它是单线程的,就像GUI应用程序中的情况一样),要么因为它使用类似于租赁模型的东西,例如特定ASP的上下文。. NET请求),通过Task.Factory.StartNew
将异步任务踢掉实际上可能是有用的,因为它使您能够逃避原始上下文。然而,你只是让你的生活变得更困难了——跟踪你的任务的完成有点复杂。你也可以通过在async
方法中使用ConfigureAwait
来达到同样的效果。
这可能无关紧要—如果您只尝试每秒管理20个请求,那么这样做所需的最小CPU工作量意味着您可能可以在一个线程上完全管理它。(另外,如果这是一个控制台应用程序,默认上下文将发挥作用,它使用线程池,所以你的任务将能够在任何情况下多线程运行。)
但是回到你的问题,对我来说,有一个单一的async
方法从队列中选择一个url,发出请求,检查响应,并在必要时向坏url集合添加一个条目,这似乎是完全合理的。并且,从计时器中启动事情似乎也是合理的——这将限制尝试连接的速度,而不会因缓慢的响应而陷入困境(例如,如果大量请求最终试图与脱机的服务器进行通信)。如果遇到一些异常情况,即连续出现数万个url,这些url都指向一个没有响应的服务器,那么可能有必要对正在运行的请求的最大数量引入上限。(与此相关的是,无论使用哪种HTTP API,您都需要确保不会达到每个客户端连接的限制——这可能最终会限制有效吞吐量。)
您将需要添加某种类型的完成处理—仅仅启动异步操作而不做任何事情来处理结果是不好的做法,因为您可能会以无处可去的异常告终。(在。net 4.0中,这些异常用于终止您的进程,但在。net 4.5中,默认情况下,来自异步操作的未处理异常将被忽略!)如果你最终决定值得通过Task.Factory.StartNew
启动,请记住,你已经结束了一个额外的包装层,所以你需要做一些像myTask.Unwrap().ContinueWith(...)
这样的事情来正确处理它。
当然可以。并发集合被称为"并发",因为它们可以被使用…通过多个线程并发,并对其行为进行一些保证。
ConcurrentQueue将确保插入其中的每个元素只提取一次(并发线程永远不会错误地提取相同的项,一旦队列为空,则所有项都已被线程提取)。
编辑:唯一可能出错的是50ms不足以完成请求,因此任务队列中积累了越来越多的任务。如果发生这种情况,你的记忆可能会被填满,但它还是会工作。所以,这是可行的。
无论如何,我想强调任务不是线程这一事实。即使你创建了100个任务,框架也会决定其中有多少是实际并发执行的。
如果你想对并行度有更多的控制,你应该使用异步请求。在你的评论中,你写了"异步web请求",但我不能理解你是否写异步只是因为它在不同的线程上,或者因为你打算使用异步API。如果您正在使用异步API,我希望看到一些附加到完成事件的处理程序,但是我看不到它,因此我假设您正在使用从异步任务发出的同步请求。如果您正在使用异步请求,那么使用任务是没有意义的,只需使用计时器来发出异步请求,因为它们已经是异步的。
当我说"异步请求"时,我指的是WebRequest这样的方法。GetResponseAsync和WebRequest.BeginGetResponse.
EDIT2:如果您想使用异步请求,那么您可以直接从计时器处理程序发出请求。BeginGetResponse
方法接受两个参数。第一个是回调过程,将调用它来报告请求的状态。您可以对所有请求传递相同的过程。第二个是用户提供的对象,它将存储请求的状态,您可以使用这个参数来区分不同的请求。你甚至可以不用计时器。比如:
private readonly int desiredConcurrency = 20;
struct RequestData
{
public UrlInfo url;
public HttpWebRequest request;
}
/// Handles the completion of an asynchronous request
/// When a request has been completed,
/// tries to issue a new request to another url.
private void AsyncRequestHandler(IAsyncResult ar)
{
if (ar.IsCompleted)
{
RequestData data = (RequestData)ar.AsyncState;
HttpWebResponse resp = data.request.EndGetResponse(ar);
if (resp.StatusCode != 200)
{
BadUrls.Add(data.url);
}
//A request has been completed, try to start a new one
TryIssueRequest();
}
}
/// If urls is not empty, dequeues a url from it
/// and issues a new request to the extracted url.
private bool TryIssueRequest()
{
RequestData rd;
if (urls.TryDequeue(out rd.url))
{
rd.request = CreateRequestTo(rd.url); //TODO implement
rd.request.BeginGetResponse(AsyncRequestHandler, rd);
return true;
}
else
{
return false;
}
}
//Called by a button handler, or something like that
void StartTheRequests()
{
for (int requestCount = 0; requestCount < desiredConcurrency; ++requestCount)
{
if (!TryIssueRequest()) break;
}
}