假设我有以下Observable
。 (请注意,解析逻辑位于不同的层中,并且应该是可测试的,因此它必须保持为单独的方法。 另请注意,真正的循环是解析 XML,并具有各种分支和异常处理)。
IObservable<string> GetLinesAsync(StreamReader r)
{
return Observable.Create<string>(subscribeAsync: async (observer, ct) =>
{
//essentially force a continuation/callback to illustrate my problem
await Task.Delay(5);
while (!ct.IsCancellationRequested)
{
string readLine = await r.ReadLineAsync();
if (readLine == null)
break;
observer.OnNext(readLine);
}
});
}
我想使用它,例如与另一个产生StreamReader
Observable
一起使用,如下所示,但无论如何我都无法让处置工作。
[TestMethod]
public async Task ReactiveTest()
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
var source1 = Observable.Using(
() => File.OpenRead(filePath),
readFile => Observable.Using(() => new StreamReader(readFile),
reader => Observable.Return(reader)
)
);
//"GetLinesAsync" already exists. How can I use it?
var combined = source1
.SelectMany(GetLinesAsync);
int count = await combined.Count();
}
如果您运行几次(例如,使用断点等),您应该会看到它爆炸了,因为 TextReader 已关闭。 (在我的实际问题中,它偶尔发生在ReadLineAsync
但Task.Delay
使它更容易发生)。 显然,异步性质导致第一个可观察对象释放流,并且只有在该之后才会发生延续,当然此时流已经关闭。
所以:
- 第一个一次性使用装置设置正确吗? 我尝试了其他方式(见下文*)
- 这是执行异步可观察(即GetLinesAsync)的正确方法吗? 我还需要为此做些什么吗?
- 这是将可观察量链接在一起的正确方法吗? 假设
GetLinesAsync
已经存在,如果可能的话,它的签名不应该被更改(例如,接受IObservable<StreamReader>
) - 如果这是将可观察量粘合在一起的正确方法,那么有没有办法让它与
async
使用一起使用?
*这是我设置第一个可观察者的另一种方式
var source3 = Observable.Create<StreamReader>(observer =>
{
FileStream readFile = File.OpenRead(filePath);
StreamReader reader = new StreamReader(readFile);
observer.OnNext(reader);
observer.OnCompleted();
return new CompositeDisposable(readFile, reader);
});
你真的需要充分利用这里的Defer
和Using
运算符。
Using
专门用于以下情况:你希望分别在订阅开始和完成时创建并最终处置一次性资源。
Defer
是一种确保在拥有新订阅时始终创建新管道的方法(在 MSDN 上阅读详细信息)
你的第二种方法是要走的路。你做对了100%:
Observable.Using(
() => File.OpenRead(filePath),
readFile =>
Observable.Using(
() => new StreamReader(readFile),
reader =>
这将在正确的时间为每个资源打开和处置资源。
这是您需要修复的代码块之前的内容以及reader =>
之后的内容。
reader =>
之后是这样的:
Observable
.Defer(() => Observable.FromAsync(() => reader.ReadLineAsync()))
.Repeat()
.TakeWhile(x => x != null)));
这是 Rx 从流中读取直到完成的惯用方式。
"之前"块只是另一种Defer
,以确保您计算每个新订阅者的Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini")
。在这种情况下没有必要,因为我们知道filePath
不会更改,但这是一种很好的做法,并且当此值可以更改时可能至关重要。
以下是完整代码:
public async Task ReactiveTest()
{
IObservable<string> combined =
Observable.Defer(() =>
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
return
Observable.Using(
() => File.OpenRead(filePath),
readFile =>
Observable.Using(
() => new StreamReader(readFile),
reader =>
Observable
.Defer(() => Observable.FromAsync(() => reader.ReadLineAsync()))
.Repeat()
.TakeWhile(x => x != null)));
});
int count = await combined.Count();
}
我已经测试过它,它运行良好。
假设您有一个固定的签名GetLines
您可以这样做:
public IObservable<string> GetLines(StreamReader reader)
{
return Observable
.Defer(() => Observable.FromAsync(() => reader.ReadLineAsync()))
.Repeat()
.TakeWhile(x => x != null);
}
public async Task ReactiveTest()
{
IObservable<string> combined =
Observable.Defer(() =>
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
return
Observable.Using(
() => File.OpenRead(filePath),
readFile =>
Observable.Using(
() => new StreamReader(readFile),
GetLines));
});
int count = await combined.Count();
}
它也可以工作并经过测试。
您遇到的问题是您的序列返回单个项目,即读取器。使用读取器,需要打开文件流。不幸的是,文件流在创建流读取器后立即关闭:
- 创建
StreamReader reader
OnNext(reader)
被称为using
块退出,处理流- 调用 OnComplete,终止订阅
哎呀!
若要解决此问题,必须将StreamReader
的生存期与使用者(而不是生产者)的生存期联系起来。发生原始错误是因为Observable.Using
在源上调用资源后立即释放OnCompleted
资源。
// Do not dispose of the reader when it is created
var readerSequence = Observable.Return(new StreamReader(ms));
var combined = readerSequence
.Select(reader =>
{
return Observable.Using(() => reader, resource => GetLines(resource));
})
.Concat();
我不是这个的忠实粉丝,因为你现在依靠你的消费者清理每个StreamReader
但我还没有制定更好的方法!
到目前为止,这是允许我继续使用 GetLinesAsync 时唯一有效的方法:
//"GetLinesAsync" already exists. How can I use it?
[TestMethod]
public async Task ReactiveTest2()
{
var combined2 = Observable.Create<string>(async observer =>
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
using (FileStream readFile = File.OpenRead(filePath))
{
using (StreamReader reader = new StreamReader(readFile))
{
await GetLinesAsync(reader)
.ForEachAsync(result => observer.OnNext(result));
}
}
});
int count = await combined2.Count();
}
这不能可靠地工作:
[TestMethod]
public async Task ReactiveTest3()
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
var source1 = Observable.Defer(() => Observable.Using(
() => File.OpenRead(filePath),
readFile => Observable.Using(() => new StreamReader(readFile),
reader => Observable.Return(reader)
)
));
//"GetLines" already exists. How can I use it?
var combined = source1
.SelectMany(reader => Observable.Defer(() => GetLinesAsync(reader)));
int count = await combined.Count();
}
根据Enigmativity的解决方案,它似乎只有在有一个可观察性时才有效:
[TestMethod]
public async Task ReactiveTest4()
{
var filePath = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.Windows), "win.ini");
var source1 = Observable.Using(
() => File.OpenRead(filePath),
readFile => Observable.Using(() => new StreamReader(readFile),
reader => GetLinesAsync(reader)
)
);
int count = await source1.Count();
}
但是我还没有找到一种方法来保持两个可观察量之间的分离(我这样做是为了分层和单元测试目的)并使其正常工作,所以我不认为这个问题得到了回答。