如何将写入流1的内容管道传输到流2

本文关键字:管道 传输 | 更新日期: 2023-09-27 18:20:40

这是我的场景:

producer.WriteStream(stream);
consumer.ReadStream(stream);

我想要一些允许producer生成的字节逐渐传输到consumer的东西。

我可以将所有内容写入MemoryStream,然后倒带并在consumer上读取,但这会导致巨大的内存消耗。

我怎样才能做到这一点?

如何将写入流1的内容管道传输到流2

使用管道作为数据的底层传输,您可以拥有允许这种通信机制的"写流"(服务器)和"读流"(客户端)。

使用匿名管道或命名管道(如果您需要进程间通信),这非常简单。创建管道流:

AnonymousPipeServerStream pipeServer = new AnonymousPipeServerStream();
AnonymousPipeClientStream pipeClient =
  new AnonymousPipeClientStream(pipeServer.GetClientHandleAsString());

现在你可以用这些来写&读取:

producer.WriteStream(pipeServer);
// somewhere else...
consumer.ReadStream(pipeClient);

我只是为了好玩而把它放在一起,它没有经过测试,可能有一些错误。您只需将ReaderStream传递给读取器,将WriterStream传递给编写器。

public class LoopbackStream
{
    public Stream ReaderStream { get; }
    public Stream WriterStream { get; }
    private readonly BlockingCollection<byte[]> _buffer;
    public LoopbackStream()
    {
        _buffer = new BlockingCollection<byte[]>();
        ReaderStream = new ReaderStreamInternal(_buffer);
        WriterStream = new WriterStreamInternal(_buffer);
    }
    private class WriterStreamInternal : Stream
    {
        private readonly BlockingCollection<byte[]> _buffer;
        public WriterStreamInternal(BlockingCollection<byte[]> buffer)
        {
            _buffer = buffer;
            CanRead = false;
            CanWrite = true;
            CanSeek = false;
        }
        public override void Close()
        {
            _buffer.CompleteAdding();
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            var newData = new byte[count];
            Array.Copy(buffer, offset, newData, 0, count);
            _buffer.Add(newData);
        }
        public override void Flush()
        {
        }
        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }
        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }
        public override bool CanRead { get; }
        public override bool CanSeek { get; }
        public override bool CanWrite { get; }
        public override long Length
        {
            get { throw new NotSupportedException(); }
        }
        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }
    }
    private class ReaderStreamInternal : Stream
    {
        private readonly BlockingCollection<byte[]> _buffer;
        private readonly IEnumerator<byte[]> _readerEnumerator;
        private byte[] _currentBuffer;
        private int _currentBufferIndex = 0;
        public ReaderStreamInternal(BlockingCollection<byte[]> buffer)
        {
            _buffer = buffer;
            CanRead = true;
            CanWrite = false;
            CanSeek = false;
            _readerEnumerator = _buffer.GetConsumingEnumerable().GetEnumerator();
        }
        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                _readerEnumerator.Dispose();
            }
            base.Dispose(disposing);
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            if (_currentBuffer == null)
            {
                bool read = _readerEnumerator.MoveNext();
                if (!read)
                    return 0;
                _currentBuffer = _readerEnumerator.Current;
            }
            var remainingBytes = _currentBuffer.Length - _currentBufferIndex;
            var readBytes = Math.Min(remainingBytes, count);
            Array.Copy(_currentBuffer, _currentBufferIndex, buffer, offset, readBytes);
            _currentBufferIndex += readBytes;
            if (_currentBufferIndex == _currentBuffer.Length)
            {
                _currentBuffer = null;
                _currentBufferIndex = 0;
            }
            return readBytes;
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }
        public override void Flush()
        {
        }
        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }
        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }
        public override bool CanRead { get; }
        public override bool CanSeek { get; }
        public override bool CanWrite { get; }
        public override long Length
        {
            get { throw new NotSupportedException(); }
        }
        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }
    }
}