我正在学习使用RX,并尝试了这个示例。但无法修复突出显示的while语句中发生的异常-while(!f.EndofStream)
我想逐行读取一个巨大的文件,对于每一行数据,我想在不同的线程中进行一些处理(所以我使用了ObserverOn)我希望整件事不同步。我想使用ReadLineAsync,因为它返回TASK,所以我可以将其转换为Observables并订阅它。
我想我首先创建的任务线程位于Rx线程之间。但是,即使我使用currentThread使用Observe和Subscribe,我仍然无法停止异常。想知道我是如何用Rx巧妙地完成这项任务的。
想知道整件事是否可以做得更简单?
static void Main(string[] args)
{
RxWrapper.ReadFileWithRxAsync();
Console.WriteLine("this should be called even before the file read begins");
Console.ReadLine();
}
public static async Task ReadFileWithRxAsync()
{
Task t = Task.Run(() => ReadFileWithRx());
await t;
}
public static void ReadFileWithRx()
{
string file = @"C:FileWithLongListOfNames.txt";
using (StreamReader f = File.OpenText(file))
{
string line = string.Empty;
bool continueRead = true;
***while (!f.EndOfStream)***
{
f.ReadLineAsync()
.ToObservable()
.ObserveOn(Scheduler.Default)
.Subscribe(t =>
{
Console.WriteLine("custom code to manipulate every line data");
});
}
}
}
异常是一个InvalidOperationException
-我不太熟悉FileStream的内部结构,但根据异常消息,这是因为流上有一个正在运行的异步操作。这意味着您必须等待任何ReadLineAsync()
调用完成,然后才能检查EndOfStream
。
Matthew Finlay对您的代码进行了巧妙的重新处理,以解决这个紧迫的问题。然而,我认为它也有自己的问题,还有一个更大的问题需要研究。让我们来看看问题的基本要素:
- 你有一个很大的文件
- 您希望异步处理它
这表明您不希望整个文件都在内存中,您希望在处理完成时得到通知,并且可能您希望尽快处理文件。
两种解决方案都使用一个线程来处理每一行(ObserveOn
将每一行传递给线程池中的一个线程)。这实际上不是一个有效的方法。
从这两种解决方案来看,有两种可能性:
- A。读取文件行的平均时间比处理文件行的时间长
- B。读取文件行的平均时间比处理文件行的时间短
A。文件读取行的速度慢于处理行的速度
在A的情况下,系统在等待文件IO完成时,基本上会将大部分时间闲置。在这种情况下,Matthew的解决方案不会导致内存填充,但值得一看的是,在紧密循环中直接使用ReadLines
是否会因为线程争用较少而产生更好的结果。(如果ReadLines
在调用MoveNext
之前没有得到线路,ObserveOn
将线路推到另一个线程只会为您带来好处——我怀疑它确实得到了线路——但请测试并查看!)
B。文件读取一行比处理一行快
在B的情况下(我认为考虑到你已经尝试过的情况,这更有可能),所有这些行都会开始在内存中排队,对于一个足够大的文件,你最终会在内存中获得大部分。
您应该注意,除非您的处理程序启动异步代码来处理一行,否则所有行都将被串行处理,因为Rx保证OnNext()
处理程序调用不会重叠。
ReadLines()
方法非常好,因为它返回一个IEnumerable<string>
,而正是您对它的枚举驱动了读取文件。然而,当你对此调用ToObservable()
时,它会尽可能快地枚举以生成可观察的事件——Rx中没有反馈(在反应程序中称为"背压")来减缓这个过程。
问题不在于ToObservable
本身,而在于ObserveOn
。ObserveOn
不会阻止它调用的OnNext()
处理程序,直到它的订阅者处理完事件——它会根据目标调度程序尽可能快地将事件排队。
如果删除ObserveOn
,那么只要OnNext
处理程序是同步的,您就会看到每一行都被一次读取和处理,因为ToObservable()
与处理程序在同一个线程上处理枚举。
如果这不是你想要的,并且你试图通过在订阅服务器中启动异步作业(例如Task.Run(() => /* process line */
或类似的作业)来缓解这种情况,以追求并行处理,那么事情就不会像你希望的那样顺利。
因为处理一行比读取一行需要更长的时间,所以您将创建越来越多与传入行不同步的任务。线程数将逐渐增加,线程池将被耗尽。
在这种情况下,Rx并不是一个非常适合的。
您可能想要的是少量的工作线程(每个处理器内核可能有1个),它们一次获取一行代码进行处理,并限制内存中文件的行数。
一个简单的方法可以是这样,它将内存中的行数限制为固定数量的工作者。这是一个基于拉动的解决方案,在这种情况下是一个更好的设计:
private Task ProcessFile(string filePath, int numberOfWorkers)
{
var lines = File.ReadLines(filePath);
var parallelOptions = new ParallelOptions {
MaxDegreeOfParallelism = numberOfWorkers
};
return Task.Run(() =>
Parallel.ForEach(lines, parallelOptions, ProcessFileLine));
}
private void ProcessFileLine(string line)
{
/* Your processing logic here */
Console.WriteLine(line);
}
并像这样使用:
static void Main()
{
var processFile = ProcessFile(
@"C:Usersjames.worldDownloadsexample.txt", 8);
Console.WriteLine("Processing file...");
processFile.Wait();
Console.WriteLine("Done");
}
最终注释
有一些方法可以处理Rx中的背压(在SO周围搜索一些讨论),但Rx处理得不好,我认为由此产生的解决方案比上面的替代方案可读性差。您还可以考虑许多其他方法(基于参与者的方法,如TPL数据流,或用于高性能无锁方法的LMAX Disruptor风格的环形缓冲区),但从队列中提取工作的核心思想将很普遍。
即使在这个分析中,我也很方便地掩盖了你正在做什么来处理文件,并默认每一行的处理都是计算绑定的,并且是真正独立的。如果有工作要合并结果和/或IO活动来存储输出,那么所有的赌注都会落空——你也需要仔细检查这方面的效率。
在考虑将并行工作作为优化的大多数情况下,通常有很多变量在起作用,因此最好测量每种方法的结果,以确定什么是最佳的。测量是一门艺术——一定要测量真实的场景,取每次测试多次运行的平均值,并在运行之间正确重置环境(例如,消除缓存效应),以减少测量误差。
我还没有研究是什么导致了您的异常,但我认为写这篇文章最简洁的方法是:
File.ReadLines(file)
.ToObservable()
.ObserveOn(Scheduler.Default)
.Subscribe(Console.Writeline);
注意:ReadLines与ReadAllLines的不同之处在于,它将在不读取整个文件的情况下开始屈服,这是您想要的行为。