我有TCP服务器,它可以不间断地接收大数据。我需要把这个流广播给很多客户端。
更新:我需要播放视频流。也许有现成的解决方案?
如果您想异步执行此操作,那么您可以利用System.Threading.Tasks
名称空间。
首先,您需要一个Stream
实例到Task
实例的映射,可以等待完成:
IDictionary<Stream, Task> streamToTaskMap = outputStreams.
ToDictionary(s => s, Task.Factory.StartNew(() => { });
上面有一点开销,因为浪费了一个什么都不做的Task
实例,但是考虑到需要执行的Task
实例和延续的数量,这个代价是很小的。
从那里,您将从流中读取内容,然后将其异步地写入每个Stream
实例:
byte[] buffer = new byte[<buffer size>];
int read = 0;
while ((read = inputStream.Read(buffer, 0, buffer.Length)) > 0)
{
// The buffer to copy into.
byte[] copy = new byte[read];
// Perform the copy.
Array.Copy(buffer, copy, read);
// Cycle through the map, and replace the task with a continuation
// on the task.
foreach (Stream stream in streamToTaskMap.Keys)
{
// Continue.
streaToTaskMap[stream] = streaToTaskMap[stream].ContinueWith(t => {
// Write the bytes from the copy.
stream.Write(copy, 0, copy.Length);
});
}
}
最后,您可以通过调用
等待所有写入的流。Task.WaitAll(streamToTaskMap.Values.ToArray());
有几件事需要注意。
首先,需要buffer
的副本,因为传递给ContinueWith
的lambda;lambda是一个封装buffer
的闭包,因为它是异步处理的,所以内容可能会改变。每个continuation都需要自己的缓冲区副本来读取。
这也是为什么调用Stream.Write
使用Array.Length
属性;否则,必须在循环的每次迭代中复制read
变量。
此外,能够在Stream
类上使用BeginWrite
/EndWrite
方法将是更理想的;因为没有ContinueWithAsync
方法可以获取Task
并继续异步方法,所以调用read的异步版本没有任何好处。
这是最好自己调用BeginWrite/EndWrite(以及BeginRead
/EndRead
)以充分利用异步操作的情况之一;当然,这会有点复杂,因为您将没有Task
提供的对操作结果的封装,并且如果您使用匿名方法/闭包,则必须对buffer
采取相同的预防措施。
生成一个线程,将流传递给它,以及要写入的流
byte[] buffer = new byte[BUFFER_SIZE];
int btsRead = 0;
while ((btsRead = inputStream.Read(buffer, 0, BUFFER_SIZE)) > 0)
{
foreach (Stream oStream in outputStreams)
oStream.Write(buffer, 0, btsRead);
}
编辑:并行写:
用:
Parallel.ForEach(outputStreams, oStream =>
{
oStream.Write(buffer, 0, btsRead);
});
基本上,您想要线程化应用程序。下面是一个关于线程和TCP/IP的简单示例
c#教程-简单线程TCP服务器|打开代码