那么,我有一个文件共享列表。然后我需要获得这些文件共享中的所有文件夹。这就是所有的"简单"。我做过的事。最后,经过一些逻辑,我将对象添加到集合中。
在后台使用Async/Await和Tasks时,我希望能够有另一个线程/任务启动,以便它可以继续通过集合并将数据写入磁盘。
现在,对于每个文件夹,我获得关于该文件夹的安全信息。至少会有一件物品。但是对于每个项目,我将这些信息添加到一个集合中(对于每个文件夹)。
我想在后台写入磁盘,直到没有更多的文件夹迭代,作业完成。
我正在考虑使用BlockingCollection
,但是这段代码确实有味道,并且最终没有关闭文件,因为while(true)
语句。
private static BlockingCollection<DirectorySecurityInformation> AllSecurityItemsToWrite = new BlockingCollection<DirectorySecurityInformation>();
if (sharesResults.Count > 0)
{
WriteCSVHeader();
// setup a background task which will dequeue items to write.
var csvBGTask = Task.Run(async () =>
{
using (var sw = new StreamWriter(FileName, true))
{
sw.AutoFlush = true;
while (true)
{
var dsi = AllSecurityItemsToWrite.Take();
await sw.WriteLineAsync("... blah blah blah...");
await sw.FlushAsync();
}
}
});
allTasks.Add(csvBGTask);
}
foreach(var currentShare in AllShares)
{
var dirs = Directory.EnumerateDirectories(currentShare .FullName, "*", SearchOption.AllDirectories);
foreach(var currentDir in dirs) { // Spin up a task in the BG and run to do some security analysis and add to the AllSecurityItemsToWrite collection }
}
这是最简单但最核心的例子。什么好主意吗?我只是想继续添加后台任务,并让另一个任务只是退出队列并写入磁盘,直到没有更多的共享要通过(shareResults
)。
建议使用通道。
Channel<DirectorySecurityInformation> ch =
Channel.CreateUnbounded<DirectorySecurityInformation>();
写var w = ch.Writer;
foreach(var dsi in DSIs)
w.TryWrite(dsi);
w.TryComplete();
读
public async void ReadTask()
{
var r = ch.Reader;
using (var sw = new StreamWriter(filename, true))
{
await foreach(var dsi in r.ReadAllAsync())
sw.WriteLine(dsi);
}
}
while (true) { var dsi = AllSecurityItemsToWrite.Take(); //... }
与使用Take
方法相比,使用GetConsumingEnumerable
方法通常更方便地使用BlockingCollection<T>
:
foreach (var dsi in AllSecurityItemsToWrite.GetConsumingEnumerable())
{
//...
}
这样,当CompleteAdding
方法被调用时,循环将自动停止,并且集合为空。
但我同意shingo,BlockingCollection<T>
不是正确的工具在这种情况下,因为你的工人是在异步上下文中运行。Channel<T>
应该更可取,因为它可以在不阻塞线程的情况下使用。