平行.C#中带有重试逻辑的ForEach循环



我正在使用Parallel.ForEach将C#中的多个文件从google bucket下载到文件夹位置。我使用了重试逻辑,这样它就可以在下载过程中文件下载失败的情况下重试下载文件。如何为Parallel.ForEach循环中的每个文件或每个线程应用重试逻辑。

Parallel.ForEach(listFiles, objectName =>            
{
retryCount = 0;                        
countOfFiles++;
downloadSuccess = false;
bucketFileName = Path.GetFileName(objectName.Name);
guidFolderPath = tempFolderLocation + "\" + bucketFileName;
while (retryCount < retryCountInput && downloadSuccess == false)
{
try
{
FileStream fs = new FileStream(guidFolderPath, FileMode.Create, FileAccess.Write, FileShare.Write);
using (fs)
{                                               
storage.DownloadObjectAsync(bucketName, objectName.Name, fs, option, cancellationToken, progress).Wait();
}
}
catch (Exception ex)
{
Console.WriteLine("Exception occured while downloading file: " + ex.ToString());                   
Thread.Sleep(RetryInterval(retryCount, minBackoffTimeSpan, maxBackoffTimeSpan, deltaBackoffTimeSpan));
retryCount++;
}
}
}

我会将其更改为任务并使用异步。这是你的线索。睡眠不会阻塞线程池线程。平行。ForEach用于CPU绑定工作。

类似于:(如果没有你的其余代码,我无法编译/测试它(

int retryCountInput = 5;
var tasks = new List<Task>();
foreach (var file in listFiles)
{
var task = Task.Run(async () =>
{
// make it local
int retryCount = 0;
string bucketFileName = Path.GetFileName(objectName.Name);
string guidFolderPath = tempFolderLocation + "\" + bucketFileName;
while (retryCount < retryCountInput)
{
try
{
using (var fs = new FileStream(guidFolderPath, FileMode.Create, FileAccess.Write, FileShare.Write))
// Use await here, instead of `Wait()` so this threadpool thread
// can be used for other tasks.
await storage.DownloadObjectAsync(bucketName, objectName.Name, fs, option, cancellationToken, progress);
break;
}
catch (Exception ex)
{
Console.WriteLine("Exception occured while downloading file: " + ex.ToString());
// Use Task.Delay here, so this thread is 'released'
await Task.Delay(RetryInterval(retryCount, minBackoffTimeSpan, maxBackoffTimeSpan, deltaBackoffTimeSpan));
retryCount++;
}
}
});
tasks.Add(task);
}
await Task.WhenAll(tasks);

我修改了代码并删除了Parallel.ForEach,而不是使用foreach循环来遍历文件。但现在,我无法在下载路径中找到所有文件,尽管日志显示所有文件都已下载。下载路径中已下载文件的数量发生变化,这种行为似乎是随机的。我可以使用Task.Run进行I/O操作吗?

var tasks = new List<Task>();
foreach (var objectName in listFiles)
{
var task = Task.Run(() =>
{
downloadSuccess = false;
bucketFileName = Path.GetFileName(objectName.Name);
guidFolderPath = tempFolderLocation + "\" + bucketFileName;
var maxRetryAttempts = 3;
var pauseBetweenFailures = TimeSpan.FromSeconds(2);
RetryHelper.RetryOnException(maxRetryAttempts, pauseBetweenFailures, async () =>
{
FileStream fs = new FileStream(guidFolderPath, FileMode.Create,
FileAccess.Write, FileShare.Write);
using (fs)
{
var progress = new Progress<IDownloadProgress>(
p =>
{
DownloadProgress(p, retryCount, objectName.Name);
});
await client.DownloadObjectAsync(bucketName, objectName.Name,
fs, option, cancellationToken.Token, progress);
}
});
});
tasks.Add(task);
}
await Task.WhenAll(tasks);

最新更新