我在一个简单的小控制台应用程序中使用c#中的async
和await
。我的目标很简单:以异步方式处理文件列表,这样对一个文件的处理就不会阻塞其他文件的处理。所有文件都不依赖于其他文件,并且(假设)有数千个文件要浏览。
这是我目前的代码。
public class MyClass
{
public void Go()
{
string[] fileSystemEntries = Directory.GetFileSystemEntries(@"PathToFiles");
Console.WriteLine("Starting to read from files!");
foreach (var filePath in fileSystemEntries.OrderBy(s => s))
{
Task task = new Task(() => DoStuff(filePath));
task.Start();
task.Wait();
}
}
private async void DoStuff(string filePath)
{
await Task.Run(() =>
{
Thread.Sleep(1000);
string fileName = Path.GetFileName(filePath);
string firstLineOfFile = File.ReadLines(filePath).First();
Console.WriteLine("{0}: {1}", fileName, firstLineOfFile);
});
}
}
我的Main()
方法只是调用这个类:
public static class Program
{
public static void Main()
{
var myClass = new MyClass();
myClass.Go();
}
}
这种异步编程模式似乎缺少了一些东西,因为每当我运行程序时,实际处理的文件数量似乎是随机的,从一个都没有到所有六个(在我的示例文件集中)。
基本上,主线程不会等待所有文件被处理,我想这是异步运行的一部分,但我不太希望这样。我想要的是:在尽可能多的线程中处理尽可能多的这些文件,但是在结束之前仍然等待它们全部完成处理。
async/await
背后的主要设计目标之一是促进自然异步I/O api的使用。在这种情况下,您的代码可以这样重写(未经测试):
public class MyClass
{
private int filesRead = 0;
public void Go()
{
GoAsync().Wait();
}
private async Task GoAsync()
{
string[] fileSystemEntries = Directory.GetFileSystemEntries(@"PathToFiles");
Console.WriteLine("Starting to read from files! Count: {0}", fileSystemEntries.Length);
var tasks = fileSystemEntries.OrderBy(s => s).Select(
fileName => DoStuffAsync(fileName));
await Task.WhenAll(tasks.ToArray());
Console.WriteLine("Finish! Read {0} file(s).", filesRead);
}
private async Task DoStuffAsync(string filePath)
{
string fileName = Path.GetFileName(filePath);
using (var reader = new StreamReader(filePath))
{
string firstLineOfFile =
await reader.ReadLineAsync().ConfigureAwait(false);
Console.WriteLine("[{0}] {1}: {2}", Thread.CurrentThread.ManagedThreadId, fileName, firstLineOfFile);
Interlocked.Increment(ref filesRead);
}
}
}
注意,它不会显式地生成任何新的线程,但这可能在await reader.ReadLineAsync().ConfigureAwait(false)
的后台发生。
为了得出我的解决方案,我结合了上面的注释。实际上,我根本不需要使用async
或await
关键字。我只需要创建一个任务列表,启动它们,然后调用WaitAll。不需要用async
或await
关键字装饰。结果代码如下:
public class MyClass
{
private int filesRead = 0;
public void Go()
{
string[] fileSystemEntries = Directory.GetFileSystemEntries(@"PathToFiles");
Console.WriteLine("Starting to read from files! Count: {0}", fileSystemEntries.Length);
List<Task> tasks = new List<Task>();
foreach (var filePath in fileSystemEntries.OrderBy(s => s))
{
Task task = Task.Run(() => DoStuff(filePath));
tasks.Add(task);
}
Task.WaitAll(tasks.ToArray());
Console.WriteLine("Finish! Read {0} file(s).", filesRead);
}
private void DoStuff(string filePath)
{
string fileName = Path.GetFileName(filePath);
string firstLineOfFile = File.ReadLines(filePath).First();
Console.WriteLine("[{0}] {1}: {2}", Thread.CurrentThread.ManagedThreadId, fileName, firstLineOfFile);
filesRead++;
}
}
在测试时,我添加了Thread.Sleep
调用,以及繁忙循环来将cpu固定在我的机器上。打开任务管理器,我观察到在繁忙循环期间所有的内核都是固定的,每次我运行程序时,文件都以不一致的顺序运行(这是一件好事,因为这表明唯一的瓶颈是可用线程的数量)。
每次运行程序,fileSystemEntries.Length
总是匹配filesRead
。
Parallel.ForEach
:
public class MyClass
{
private int filesRead;
public void Go()
{
string[] fileSystemEntries = Directory.GetFileSystemEntries(@"PathToFiles");
Console.WriteLine("Starting to read from files! Count: {0}", fileSystemEntries.Length);
Parallel.ForEach(fileSystemEntries, DoStuff);
Console.WriteLine("Finish! Read {0} file(s).", filesRead);
}
private void DoStuff(string filePath)
{
string fileName = Path.GetFileName(filePath);
string firstLineOfFile = File.ReadLines(filePath).First();
Console.WriteLine("[{0}] {1}: {2}", Thread.CurrentThread.ManagedThreadId, fileName, firstLineOfFile);
filesRead++;
}
}
现在c#中似乎有很多方法可以实现异步编程。在Parallel
和Task
以及async
/await
之间,有很多选择。基于这个线程,看起来对我来说最好的解决方案是Parallel
,因为它提供了最干净的解决方案,比自己手动创建Task
对象更有效,并且在实现类似结果的同时不会混淆async
和await
关键字的代码。