在 Parallel.Voke 中处理大型记录集时收到错误"任务已取消"



我正在使用 Parallel.Invoke 在 4 核机器上调用大量操作数组。 每个操作都会调用外部 Web API 以检索信息的 json 包。然后将该 json 包反序列化为一系列对象。然后,通过 EntityFramework 6 将其中每个对象插入到多个表中。

这将处理大约 2000 个不同的 ID,因此我正在尝试使用并行库来尽可能快地获得吞吐量。

我的主要:

private static void Main(string[] args)
{
var apiKey = "myKey";
List<string> caseIDs = new List<string>();
//read list of ids from DB
using (var db = new StagingContext())
{
caseIDs = db.BatchList.Where(b => b.CaseID!=null).Select(a => a.CaseID).Distinct().Take(5000).ToList();
}
List<Action> actions = new List<Action>();
foreach (var id in caseIDs)
{
var UniqueID = Guid.NewGuid();
actions.Add(() => GetRecords(id,"https://myAPIURL/{0}?api={1}&case={2}", apiKey, UniqueID));               
}
ParallelOptions op = new ParallelOptions
{
CancellationToken = tok.Token,
MaxDegreeOfParallelism = 10
};
Parallel.Invoke(op, actions.ToArray());           
Console.WriteLine("Done");
Console.ReadKey();
}

我的操作:

private static void GetRecords(string CaseID, string url, string apiKey, Guid UniqueID)
{
using (HttpClient client = new HttpClient())
{
var tmpUrl = string.Format(url, apiKey, CaseID);
client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
var result = client.GetAsync(tmpUrl).Result;
var jsonString = result.Content.ReadAsStringAsync();
jsonString.Wait();
var myObjectList = new List<MyObject>();
if (!jsonString.Result.Contains("error"))
{
myObjectList.AddRange(JsonConvert.DeserializeObject<List<MyObject>>(jsonString.Result));
foreach (var item in myObjectList)
{
item.UniqueID = UniqueID;
}
}
//Write this out to DB
using (var db = new StagingContext())
{
var myMappedObjectList = myObjectList.Adapt<List<MyObject>>();
db.CaseAttributeHistories.AddRange(myMappedObjectList);
using (var scope = new TransactionScope(TransactionScopeOption.Required, new TransactionOptions { IsolationLevel = IsolationLevel.ReadUncommitted }))
{
db.SaveChanges();
scope.Complete();
}
}
}
}

当我处理较小的数据集(~1000 条记录)时,它的效果非常好。当我处理一个更大的数据集,>1400时,我经常得到一个

"任务被取消了。">

错误。

我是并行和多线程的新手。

  • 这是一种有效的方法吗?
  • 有没有一种好方法来追踪什么是 导致取消?
  • 我将如何处理/忽略错误和 继续其余记录?
  • 在这种情况下,是否有更好或更快的模式?

首先,检查异常。吞下异常是异常处理的致命罪过。不幸的是,多线程完全自动完成。通常你必须为此编写代码。在多线程处理中,您必须编写代码来避免这种情况。在尝试多线程之前,我会建议这两篇关于异常处理的文章:

  • http://blogs.msdn.com/b/ericlippert/archive/2008/09/10/vexing-exceptions.aspx
  • http://www.codeproject.com/Articles/9538/Exception-Handling-Best-Practices-in-NET

其次,对 Web API 进行顺序调用通常是一个坏主意。请确认您没有办法批量检索数据,而不是逐个检索数据。逐段检索通常比数据产生更多的开销。

第三,你甚至可以在这种规模上自动化它吗?如果 APi 提供商不希望批量回收,他可能不希望实现这种规模的自动化。如果是这样,他可能会注意到负载的突然增加,并在以后应用一些负载限制。这可能会杀死你的程序。

第四,APi 调用的多线程可能不会加快速度。WEB API和网络将成为具有非常高可能性的瓶颈。多线程仅有助于解决 CPU 瓶颈操作。对于网络、磁盘、数据库和类似操作,性能增量通常为0。甚至性能下降,因为多个操作相互妨碍。

对于网络、磁盘和类似的长时间运行操作,必须进行一些多任务处理(即使只是一个备用线程)。但实际上多线程很少到永远无济于事。

我敢打赌异常是从client.GetAsync抛出的?

当 HTTP 调用超时时,HttpClient将抛出TaskCanceledException。(即 Web 服务没有响应)

烦人,我知道。

有可能,因为你打得太重了,它跟不上。您可以尝试提高HttpClientTimeout属性,但默认值已经是 100 秒。

如果您想忽略这些错误,请将client.GetAsync(tmpUrl)包装在 try/catch 块中并return(并可能将其记录在某个地方)。

相关内容

最新更新