我如何构建下面的代码,以便我可以在块中读取所有流。目前它一直返回相同的数据。
public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize)
{
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead);
var buffer = new byte[bufferSize];
return asyncRead(buffer, 0, bufferSize)
.Select(cbRead =>
{
var dataChunk = new byte[cbRead];
Buffer.BlockCopy(buffer, 0, dataChunk, 0, cbRead);
return dataChunk;
})
.Repeat()
.TakeWhile(dataChunk => dataChunk.Length > 0);
}
也掉进了同样的陷阱。异步流的行为类似于可重放流。虽然它只包含一个项目。要解决这个问题,在Defer
中像这样包装
Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
...