今天又是一个Rx问题
简单地说,我正在使用异步IO来操作流。然而,我们都知道,当使用异步读取时,我们不一定得到我们想要的所有字节——因此在XAsync
方法上返回int。我想知道我怎么能告诉Rx Observable
重试读取,没有从流中读取正确的字节数,并由正确的量偏移?
目前,我有这个,但不知道如何设置偏移参数在ReadAsync.
private IDisposable _streamMessageContract;
private readonly byte[] _readBuffer = new byte[8024];
public void Start()
{
// Subscribe to the stream for dataz
_streamMessageContract = Observable.FromAsync<int>(() => _stream.ReadAsync(_readBuffer, 0, _readBuffer.Length))
.Repeat()
.Subscribe(
y => _RawBytesReceived(_readBuffer, y),
ex => _Exception(ex),
() => _StreamClosed());
}
#region Helpers
private void _RawBytesReceived(byte[] bytes, int actualBytesRead)
{
}
private void _StreamClosed()
{
}
private void _Exception(Exception e)
{
}
#endregion
最简单的方法是在闭包中使用局部变量,加上Defer
来强制可观察对象在每次迭代时重新计算其函数。
假设您想在当前块结束后继续读取下一个块,您将以这样的内容结束…
// An observable that will yield the next block in the stream.
// If the resulting block length is less than blockSize, then the
// end of the stream was reached.
private IObservable<byte[]> ReadBlock(Stream stream, int blockSize)
{
// Wrap the whole thing in Defer() so that we
// get a new memory block each time this observable is subscribed.
return Observable.Defer(() =>
{
var block = new byte[blockSize];
int numRead = 0;
return Observable
.Defer(() => Observable.FromAsync<int>(() =>
stream.ReadAsync(block, numRead, block.Length - numRead)))
.Do(y -=> numRead += y)
.Repeat()
.TakeWhile(y => y > 0 && numRead != blockSize) // take until EOF or entire block is read
.LastOrDefaultAsync() // only emit the final result of this operation
.Select(_ =>
{
// If we hit EOF, then resize our result array to
// the number of bytes actually read
if (numRead < blockSize)
{
block = block.Take(numRead).ToArray();
}
return block;
});
});
}
public void Start()
{
// Subscribe to the stream for dataz.
// Just keep reading blocks until
// we get the final (under-sized) block
_streamMessageContract = ReadBlock(stream, 8024)
.Repeat()
.TakeWhile(block => block.Length == 8024) // if block is small then that means we hit the end of the stream
.Subscribe(
block => _RawBytesReceived(block),
ex => _Exception(ex),
() => _StreamClosed());
}