我正试图用我从字符串创建的流替换我从TcpClient.GetStream()
获得的流。
我正在使用以下方法来创建所述流:
public Stream GenerateStreamFromString(string s)
{
MemoryStream stream = new MemoryStream();
StreamWriter writer = new StreamWriter(stream);
writer.Write(s);
writer.Flush();
stream.Position = 0;
return stream;
}
然而,这个流是通过使用库中的Stream.Read()
来读取的,我不想改变。问题是我用一个空字符串创建这个流,因为对象需要一个流来启动,通常当使用TcpClient
流时,它会停止在Stream.Read()
,直到它有东西要读,但不是用我从字符串创建的流。
所以我的问题,我如何创建一个空流,我可以以后从字符串添加数据?
在内部使用BlockingCollection<>
作为队列,您可以这样写:
public class WatitingStream : Stream
{
private BlockingCollection<byte[]> Packets = new BlockingCollection<byte[]>();
private byte[] IncompletePacket;
private int IncompletePacketOffset;
public WatitingStream()
{
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
Packets.CompleteAdding();
}
base.Dispose(disposing);
}
public override bool CanRead
{
get { return Packets.IsCompleted; }
}
public override bool CanSeek
{
get { return false; }
}
public override bool CanWrite
{
get { return Packets.IsAddingCompleted; }
}
public override void Flush()
{
}
public override long Length
{
get
{
throw new NotSupportedException();
}
}
public override long Position
{
get
{
throw new NotSupportedException();
}
set
{
throw new NotSupportedException();
}
}
public override int Read(byte[] buffer, int offset, int count)
{
if (count == 0)
{
return 0;
}
byte[] packet;
int packetOffset;
if (IncompletePacket != null)
{
packet = IncompletePacket;
packetOffset = IncompletePacketOffset;
}
else
{
if (Packets.IsCompleted)
{
return 0;
}
packet = Packets.Take();
packetOffset = 0;
}
int read = Math.Min(packet.Length - packetOffset, count);
Buffer.BlockCopy(packet, packetOffset, buffer, offset, read);
packetOffset += read;
if (packetOffset < packet.Length)
{
IncompletePacket = packet;
IncompletePacketOffset = packetOffset;
}
else
{
IncompletePacket = null;
IncompletePacketOffset = 0;
}
return read;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
if (count == 0)
{
return;
}
byte[] packet = new byte[count];
Buffer.BlockCopy(buffer, offset, packet, 0, count);
Packets.Add(packet);
}
}
作为普通流使用。Write
不阻塞。Read
块
必须做出一些决定:这个Stream
是基于"数据包"的。它不会Write
零长度的数据包,并且Read
将返回一个数据包的数据。Read
将不会在下一个数据包上继续。如果在Read
之后还有数据留在数据包中,那么这些数据将被保存到下一个Read
。Dispose()
将停止Write
(因此,如果"客户端"在"服务器"之前执行Dispose()
,如果服务器试图执行Write
,则服务器将获得异常)。如果"服务器"先执行Dispose()
,则"客户端"可以完成仍然存在的数据包的读取。显然,可以(很容易)将这个类分成两个类(一个Server
和一个Client
),其中Server
保留BlockingCollection<>
,而客户端有对"服务器"的引用。这将解决" Dispose()
"异常/问题(但会使代码大小加倍:-))