如何使用Parallel.ForEach循环将文件上传到SFTP服务器以获得更好的性能



我尝试过Parallel.ForEach,但上传的一些文件大小为零。Uusing normal for each loop working fine。但速度很慢。有400万个文件。每个文件的大小为4 MB。

public static bool UploadFiles(
string ftpDirectory, string filePath, ConnectionInfo coninfo, string pattern)
{
using (var client = new SftpClient(coninfo))
{
client.Connect();
client.ConnectionInfo.Timeout = TimeSpan.FromDays(2);
client.KeepAliveInterval = TimeSpan.FromSeconds(60);
// 4 millions files
var files = Directory.GetFiles(filePath, pattern , SearchOption.AllDirectories);
foreach (var file in files)
{
try
{
using (Stream inputStream = new FileStream(file, FileMode.Open))
{
client.UploadFile(inputStream, ftpDirectory + Path.GetFileName(file));
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
};
client.Disconnect();
}
return true;
}

我已将WinSCP库用于Parallel.ForEach循环。它运行良好。https://winscp.net/eng/docs/library_example_parallel_transfers

public void UploadFiles(string remotePath, string localPath, string pattern)
{
try
{
SessionOptions sessionOptions = new SessionOptions
{
Protocol = Protocol.Sftp,
HostName = _sftpConfig["Host"],
UserName = _sftpConfig["UserName"],
Password = "",
SshHostKeyFingerprint = "ssh-rsa 2048 xxx",
SshPrivateKeyPath = _sftpConfig["SftpPrivateKeyPath"]
};
const int batches = 3;
DateTime started = DateTime.Now;
int count = 0;
Int64 bytes = 0;
Console.WriteLine("Starting files enumeration...");
IEnumerable<string> files = Directory.GetFiles(localPath, pattern, SearchOption.AllDirectories);
IEnumerator<string> filesEnumerator = files.GetEnumerator();
List<Task> tasks = new List<Task>();
HashSet<string> existingRemotePaths = new HashSet<string>();
for (int i = 1; i <= batches; i++)
{
int no = i;
Task task = new Task(() =>
{
using (Session uploadSession = new Session())
{
while (true)
{
string localFilePath;
lock (filesEnumerator)
{
if (!filesEnumerator.MoveNext())
{
break;
}
localFilePath = filesEnumerator.Current;
bytes += new FileInfo(localFilePath).Length;
count++;
}
if (!uploadSession.Opened)
{
Console.WriteLine("Starting upload {0}...", no);
uploadSession.Open(sessionOptions);
}
string remoteFilePath = Path.Combine(remotePath,Path.GetFileName(localFilePath)) ;
Console.WriteLine("Uploading {0} to {1} in {2}...",localFilePath, remoteFilePath, no);
uploadSession.PutFiles(
localFilePath, RemotePath.EscapeFileMask(remoteFilePath)).
Check();
}
if (uploadSession.Opened)
{
Console.WriteLine("Upload {0} done", no);
}
else
{
Console.WriteLine("Upload {0} had nothing to do", no);
}
}
});
tasks.Add(task);
task.Start();
}
Console.WriteLine("Waiting for uploads to complete...");
Task.WaitAll(tasks.ToArray());
Console.WriteLine("Done");
DateTime ended = DateTime.Now;
Console.WriteLine("Took {0}", (ended - started));
Console.WriteLine("Uploaded {0} files, totaling {1:N0} bytes", count, bytes);
}
catch (Exception e)
{
Console.WriteLine("Error: {0}", e);
}
}

最新更新