如何复制流到许多流异步c# . net

本文关键字:许多流 异步 net 何复制 复制 | 更新日期: 2023-09-27 18:13:55

我有TCP服务器,它可以不间断地接收大数据。我需要把这个流广播给很多客户端。

更新:我需要播放视频流。也许有现成的解决方案?

如何复制流到许多流异步c# . net

如果您想异步执行此操作,那么您可以利用System.Threading.Tasks名称空间。

首先,您需要一个Stream实例到Task实例的映射,可以等待完成:

IDictionary<Stream, Task> streamToTaskMap = outputStreams.
    ToDictionary(s => s, Task.Factory.StartNew(() => { });

上面有一点开销,因为浪费了一个什么都不做的Task实例,但是考虑到需要执行的Task实例和延续的数量,这个代价是很小的。

从那里,您将从流中读取内容,然后将其异步地写入每个Stream实例:

byte[] buffer = new byte[<buffer size>];
int read = 0;
while ((read = inputStream.Read(buffer, 0, buffer.Length)) > 0)
{
    // The buffer to copy into.
    byte[] copy = new byte[read];
    // Perform the copy.
    Array.Copy(buffer, copy, read);
    // Cycle through the map, and replace the task with a continuation
    // on the task.
    foreach (Stream stream in streamToTaskMap.Keys)
    {
        // Continue.
        streaToTaskMap[stream] = streaToTaskMap[stream].ContinueWith(t => {
            // Write the bytes from the copy.
            stream.Write(copy, 0, copy.Length);
        });
    }
}

最后,您可以通过调用

等待所有写入的流。
Task.WaitAll(streamToTaskMap.Values.ToArray());

有几件事需要注意。

首先,需要buffer的副本,因为传递给ContinueWith的lambda;lambda是一个封装buffer的闭包,因为它是异步处理的,所以内容可能会改变。每个continuation都需要自己的缓冲区副本来读取。

这也是为什么调用Stream.Write使用Array.Length属性;否则,必须在循环的每次迭代中复制read变量。

此外,能够在Stream类上使用BeginWrite/EndWrite方法将是更理想的;因为没有ContinueWithAsync方法可以获取Task并继续异步方法,所以调用read的异步版本没有任何好处。

这是最好自己调用BeginWrite/EndWrite(以及BeginRead/EndRead)以充分利用异步操作的情况之一;当然,这会有点复杂,因为您将没有Task提供的对操作结果的封装,并且如果您使用匿名方法/闭包,则必须对buffer采取相同的预防措施。

生成一个线程,将流传递给它,以及要写入的流

byte[] buffer = new byte[BUFFER_SIZE];
int btsRead = 0;
while ((btsRead = inputStream.Read(buffer, 0, BUFFER_SIZE)) > 0)
{
    foreach (Stream oStream in outputStreams)
        oStream.Write(buffer, 0, btsRead);
}

编辑:并行写:

用:

Parallel.ForEach(outputStreams, oStream =>
{
    oStream.Write(buffer, 0, btsRead);
});

基本上,您想要线程化应用程序。下面是一个关于线程和TCP/IP的简单示例

c#教程-简单线程TCP服务器|打开代码