从流中异步重复读取



我如何构建下面的代码,以便我可以在块中读取所有流。目前它一直返回相同的数据。

public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize)
{
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead);
    var buffer = new byte[bufferSize];
    return asyncRead(buffer, 0, bufferSize)
        .Select(cbRead =>
                    {
                        var dataChunk = new byte[cbRead];
                        Buffer.BlockCopy(buffer, 0, dataChunk, 0, cbRead);
                        return dataChunk;
                    })
        .Repeat()
        .TakeWhile(dataChunk => dataChunk.Length > 0);
}

也掉进了同样的陷阱。异步流的行为类似于可重放流。虽然它只包含一个项目。要解决这个问题,在Defer中像这样包装

Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
...

相关内容

  • 没有找到相关文章

最新更新