这是我的场景:
producer.WriteStream(stream);
consumer.ReadStream(stream);
我想要一些允许producer
生成的字节逐渐传输到consumer
的东西。
我可以将所有内容写入MemoryStream
,然后倒带并在consumer
上读取,但这会导致巨大的内存消耗。
我怎样才能做到这一点?
使用管道作为数据的底层传输,您可以拥有允许这种通信机制的"写流"(服务器)和"读流"(客户端)。
使用匿名管道或命名管道(如果您需要进程间通信),这非常简单。创建管道流:
AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream();
AnonymousPipeClientStream pipeClient =
new AnonymousPipeClientStream(pipeServer.GetClientHandleAsString());
现在你可以用这些来写&读取:
producer.WriteStream(pipeServer);
// somewhere else...
consumer.ReadStream(pipeClient);
我只是为了好玩而把它放在一起,它没有经过测试,可能有一些错误。您只需将ReaderStream
传递给读取器,将WriterStream
传递给编写器。
public class LoopbackStream
{
public Stream ReaderStream { get; }
public Stream WriterStream { get; }
private readonly BlockingCollection<byte[]> _buffer;
public LoopbackStream()
{
_buffer = new BlockingCollection<byte[]>();
ReaderStream = new ReaderStreamInternal(_buffer);
WriterStream = new WriterStreamInternal(_buffer);
}
private class WriterStreamInternal : Stream
{
private readonly BlockingCollection<byte[]> _buffer;
public WriterStreamInternal(BlockingCollection<byte[]> buffer)
{
_buffer = buffer;
CanRead = false;
CanWrite = true;
CanSeek = false;
}
public override void Close()
{
_buffer.CompleteAdding();
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
var newData = new byte[count];
Array.Copy(buffer, offset, newData, 0, count);
_buffer.Add(newData);
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override bool CanRead { get; }
public override bool CanSeek { get; }
public override bool CanWrite { get; }
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
}
private class ReaderStreamInternal : Stream
{
private readonly BlockingCollection<byte[]> _buffer;
private readonly IEnumerator<byte[]> _readerEnumerator;
private byte[] _currentBuffer;
private int _currentBufferIndex = 0;
public ReaderStreamInternal(BlockingCollection<byte[]> buffer)
{
_buffer = buffer;
CanRead = true;
CanWrite = false;
CanSeek = false;
_readerEnumerator = _buffer.GetConsumingEnumerable().GetEnumerator();
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_readerEnumerator.Dispose();
}
base.Dispose(disposing);
}
public override int Read(byte[] buffer, int offset, int count)
{
if (_currentBuffer == null)
{
bool read = _readerEnumerator.MoveNext();
if (!read)
return 0;
_currentBuffer = _readerEnumerator.Current;
}
var remainingBytes = _currentBuffer.Length - _currentBufferIndex;
var readBytes = Math.Min(remainingBytes, count);
Array.Copy(_currentBuffer, _currentBufferIndex, buffer, offset, readBytes);
_currentBufferIndex += readBytes;
if (_currentBufferIndex == _currentBuffer.Length)
{
_currentBuffer = null;
_currentBufferIndex = 0;
}
return readBytes;
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override void Flush()
{
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override bool CanRead { get; }
public override bool CanSeek { get; }
public override bool CanWrite { get; }
public override long Length
{
get { throw new NotSupportedException(); }
}
public override long Position
{
get { throw new NotSupportedException(); }
set { throw new NotSupportedException(); }
}
}
}