从异步链接可观察量时的处置



假设我有以下Observable。 (请注意,解析逻辑位于不同的层中,并且应该是可测试的,因此它必须保持为单独的方法。 另请注意,真正的循环是解析 XML,并具有各种分支和异常处理)。

IObservable<string> GetLinesAsync(StreamReader r)
{
return Observable.Create<string>(subscribeAsync: async (observer, ct) =>
{
//essentially force a continuation/callback to illustrate my problem
await Task.Delay(5);
while (!ct.IsCancellationRequested)
{
string readLine = await r.ReadLineAsync();
if (readLine == null)
break;
observer.OnNext(readLine);
}
});
}

我想使用它,例如与另一个产生StreamReaderObservable一起使用,如下所示,但无论如何我都无法让处置工作。

[TestMethod]
public async Task ReactiveTest()
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
var source1 = Observable.Using(
() => File.OpenRead(filePath),   
readFile => Observable.Using(() => new StreamReader(readFile),
reader => Observable.Return(reader)
)
);
//"GetLinesAsync" already exists.  How can I use it?
var combined = source1
.SelectMany(GetLinesAsync);
int count = await combined.Count();
}

如果您运行几次(例如,使用断点等),您应该会看到它爆炸了,因为 TextReader 已关闭。 (在我的实际问题中,它偶尔发生在ReadLineAsyncTask.Delay使它更容易发生)。 显然,异步性质导致第一个可观察对象释放流,并且只有在该之后才会发生延续,当然此时流已经关闭。

所以:

  1. 第一个一次性使用装置设置正确吗? 我尝试了其他方式(见下文*)
  2. 这是执行异步可观察(即GetLinesAsync)的正确方法吗? 我还需要为此做些什么吗?
  3. 这是将可观察量链接在一起的正确方法吗? 假设GetLinesAsync已经存在,如果可能的话,它的签名不应该被更改(例如,接受IObservable<StreamReader>)
  4. 如果这是将可观察量粘合在一起的正确方法,那么有没有办法让它与async使用一起使用?

*这是我设置第一个可观察者的另一种方式

var source3 = Observable.Create<StreamReader>(observer =>
{
FileStream readFile = File.OpenRead(filePath);
StreamReader reader = new StreamReader(readFile);
observer.OnNext(reader);
observer.OnCompleted();
return new CompositeDisposable(readFile, reader);
});

你真的需要充分利用这里的DeferUsing运算符。

Using专门用于以下情况:你希望分别在订阅开始和完成时创建并最终处置一次性资源。

Defer是一种确保在拥有新订阅时始终创建新管道的方法(在 MSDN 上阅读详细信息)

你的第二种方法是要走的路。你做对了100%:

Observable.Using(
() => File.OpenRead(filePath),
readFile =>
Observable.Using(
() => new StreamReader(readFile),
reader =>

这将在正确的时间为每个资源打开和处置资源。

这是您需要修复的代码块之前的内容以及reader =>之后的内容。

reader =>之后是这样的:

Observable
.Defer(() => Observable.FromAsync(() => reader.ReadLineAsync()))
.Repeat()
.TakeWhile(x => x != null)));

这是 Rx 从流中读取直到完成的惯用方式。

"之前"块只是另一种Defer,以确保您计算每个新订阅者的Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini")。在这种情况下没有必要,因为我们知道filePath不会更改,但这是一种很好的做法,并且当此值可以更改时可能至关重要。

以下是完整代码:

public async Task ReactiveTest()
{
IObservable<string> combined =
Observable.Defer(() =>
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
return
Observable.Using(
() => File.OpenRead(filePath),
readFile =>
Observable.Using(
() => new StreamReader(readFile),
reader =>
Observable
.Defer(() => Observable.FromAsync(() => reader.ReadLineAsync()))
.Repeat()
.TakeWhile(x => x != null)));
});
int count = await combined.Count();
}

我已经测试过它,它运行良好。

假设您有一个固定的签名GetLines您可以这样做:

public IObservable<string> GetLines(StreamReader reader)
{
return Observable
.Defer(() => Observable.FromAsync(() => reader.ReadLineAsync()))
.Repeat()
.TakeWhile(x => x != null);
}
public async Task ReactiveTest()
{
IObservable<string> combined =
Observable.Defer(() =>
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
return
Observable.Using(
() => File.OpenRead(filePath),
readFile =>
Observable.Using(
() => new StreamReader(readFile),
GetLines));
});
int count = await combined.Count();
}

它也可以工作并经过测试。

您遇到的问题是您的序列返回单个项目,即读取器。使用读取器,需要打开文件流。不幸的是,文件流在创建流读取器后立即关闭:

  1. 创建StreamReader reader
  2. OnNext(reader)被称为
  3. using块退出,处理流
  4. 调用 OnComplete,终止订阅

哎呀!

若要解决此问题,必须将StreamReader的生存期与使用者(而不是生产者)的生存期联系起来。发生原始错误是因为Observable.Using在源上调用资源后立即释放OnCompleted资源。

// Do not dispose of the reader when it is created
var readerSequence = Observable.Return(new StreamReader(ms));
var combined = readerSequence
.Select(reader =>
{
return Observable.Using(() => reader, resource => GetLines(resource));
})
.Concat();

我不是这个的忠实粉丝,因为你现在依靠你的消费者清理每个StreamReader但我还没有制定更好的方法!

到目前为止,这是允许我继续使用 GetLinesAsync 时唯一有效的方法:

//"GetLinesAsync" already exists.  How can I use it?

[TestMethod]
public async Task ReactiveTest2()
{
var combined2 = Observable.Create<string>(async observer =>
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
using (FileStream readFile = File.OpenRead(filePath))
{
using (StreamReader reader = new StreamReader(readFile))
{
await GetLinesAsync(reader)
.ForEachAsync(result => observer.OnNext(result));
}
}
});

int count = await combined2.Count();
}

这不能可靠地工作:

[TestMethod]
public async Task ReactiveTest3()
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
var source1 = Observable.Defer(() => Observable.Using(
() => File.OpenRead(filePath),
readFile => Observable.Using(() => new StreamReader(readFile),
reader => Observable.Return(reader)
)
));
//"GetLines" already exists.  How can I use it?
var combined = source1      
.SelectMany(reader => Observable.Defer(() => GetLinesAsync(reader)));
int count = await combined.Count();
}

根据Enigmativity的解决方案,它似乎只有在有一个可观察性时才有效:

[TestMethod]
public async Task ReactiveTest4()
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
var source1 = Observable.Using(
() => File.OpenRead(filePath),
readFile => Observable.Using(() => new StreamReader(readFile),
reader => GetLinesAsync(reader)
)
);
int count = await source1.Count();
}

但是我还没有找到一种方法来保持两个可观察量之间的分离(我这样做是为了分层和单元测试目的)并使其正常工作,所以我不认为这个问题得到了回答。

相关内容

  • 没有找到相关文章

最新更新