我有一个可观察的流,每次文件观察器检测到更改发生时都会勾选它。然后,此刻度会导致嵌套的 Observable 启动以重复读取打开的文件流,直到读取所有新更新,然后将它们平展到最终输出流。
我遇到的问题是,如果文件发生了更改,启动了嵌套的可观察量,如果这个嵌套的可观察量仍在从文件中读取行并且在文件发生第二次更改时尚未完成,那么文件观察器可观察量会导致第二个嵌套可观察量启动,因此您有两个嵌套的可观察量都试图从同一流中读取, 导致无效操作异常 - 流正由另一个进程使用
如何更改此可观察量,使其仅允许父可观察量在前一个可观察量已完成时创建嵌套可观察量?即像手动重置事件一样门控父可观察,这只允许它在上一个嵌套完成并停止使用流后继续?
我的代码
var fileSystemWatcherChanges = Observable<Unit>()
/// abbreviated for question, code here opens a file watcher to create this stream
var outputStream = fileSystemWatcherChanges
.StartWith(Unit.Default)
.Select(x =>
Observable
.Defer(() => Observable.FromAsync(sr.ReadLineAsync))
.Repeat()
.TakeUntil(w => w == null))
.Merge()
.Where(w => w != null))
使用解决方案更新:
对于那些对解决此问题的潜在方法感兴趣的人,我已经做了一些基本的锁定,似乎已经解决了这个问题。如果我们当前仍在处理内部可观察量,它将忽略刻度,我认为这在这种情况下是令人满意的,因为更新刻度只是告诉我们有更多的行要读取,并且当前正在执行的 ReadLine 可观察量应该拾取这些新行如果它们在那里。当我们主动处理文件流时,我们不关心任何文件更新触发器。
var readingFile = 0;
var outputStream = fileSystemWatcherChanges
.StartWith(Unit.Default)
.Select(delegate(Unit x)
{
// Deny a second file update from starting to read the stream if we are already reading it
if (0 == Interlocked.Exchange(ref readingFile, 1))
{
return Observable
.Defer(() => Observable.FromAsync(sr.ReadLineAsync))
.Repeat()
.TakeUntil(w => w == null)
.Finally(() => Interlocked.Exchange(ref readingFile, 0));
}
return Observable.Empty<string>();
})
.Merge()
.Where(w => w != null))
而不是Merge
你应该使用Concat
它订阅下一个可观察量,直到上一个观察量完成:
public static IObservable<string> ReadAsObservable<TNotification>(this TextReader reader, IObservable<TNotification> notification)
{
return Observable.Using(() => reader,
r =>
{
var readLine = Observable.FromAsync(r.ReadLineAsync);
return notification.Select(_ => readLine.Repeat().TakeWhile(x => x != null))
.Concat();
});
}