如何仅在目的地被消费时将流复制到另一个流

本文关键字:复制 另一个 何仅 目的地 | 更新日期: 2023-09-27 18:17:37

类似于如何将一个流的内容复制到另一个流?

但是我对sourceStream.CopyTo(destStream)的理解意味着它将从头到尾读取整个sourceStream(块或其他)以复制它,然后消费者返回并再次读取流(其副本),导致O(2n)而不是O(n),对吗?如果destStream是一个临时副本(即MemoryStream),那么我也将最终为每个副本加载整个源流到内存中。

是否有一种方法可以做到这一点,以便它只被复制为destStream被消耗/读取?

具体来说,在。net c#中,我需要复制输入流并将其写入多个"目的地"(通过各种辅助库,其中一些库会处理给定的流)。输入可能非常大,并且通常实际上是FileStream,所以当我可以倒带并从磁盘缓冲它时,我宁愿不将整个文件加载到内存中。

示例场景:

void WriteToMany(Stream sourceStream, IEnumerable<ICanPutStream> destinations) {
    foreach(var endpoint in destinations) {
        // <-- I need to make a copy of `stream` here because...
        endpoint.PutStream(sourceStream); // ...some endpoints automatically dispose the stream
    }
}

如果我在PutStream被调用之前做一个拷贝,它将读取整个源流。我可以忍受这一点,但是如果我将它复制到MemoryStream,它也会为每个端点将其加载到内存中(尝试处理可能/尚未处理的东西)。理想情况下,只有在PutStream的内部工作期间,原始流才会被复制/读取。

如何仅在目的地被消费时将流复制到另一个流

除非您可以查找流的开头,否则您必须将整个流复制到内存中以拥有多个消费者。否则,流数据只对第一个消费者可用。

如果你有一个可搜索的流(如FileStream),你想把它传递给多个消费者而不让它被处理,你可以实现一个Stream代理,它将所有成员委托给底层流,除了Dispose。它看起来像这样:

class StreamProxy : Stream
{
    private readonly Stream _stream;
    public StreamProxy(Stream stream)
    {
        if (stream == null) throw new ArgumentNullException(nameof(stream));
        _stream = stream;
    }
    protected override void Dispose(bool disposing)
    {
        //don't dispose inner stream
    }
    public override void Flush()
    {
        _stream.Flush();
    }
    public override long Seek(long offset, SeekOrigin origin)
    {
        return _stream.Seek(offset, origin);
    }
    public override void SetLength(long value)
    {
        _stream.SetLength(value);
    }
    public override int Read(byte[] buffer, int offset, int count)
    {
        return _stream.Read(buffer, offset, count);
    }
    public override void Write(byte[] buffer, int offset, int count)
    {
        _stream.Write(buffer, offset, count);
    }
    public override bool CanRead
    {
        get { return _stream.CanRead; }
    }
    public override bool CanSeek
    {
        get { return _stream.CanSeek; }
    }
    public override bool CanWrite
    {
        get { return _stream.CanWrite; }
    }
    public override long Length
    {
        get { return _stream.Length; }
    }
    public override long Position
    {
        get { return _stream.Position; }
        set { _stream.Position = value; }
    }
}

这样,每个消费者都可以处理他们的流的"副本"(StreamProxy的实例),而不处理底层流。一旦消费者完成,将底层流寻回起点,并将代理传递给另一个消费者。

关于您的问题:是否有一种方法可以做到这一点,以便它只在destStream被消费/读取时复制?您可以扩展上面的StreamProxy类,以便它在内部流中跟踪自己的位置。然后,StreamProxy将负责每个读操作,将内部流寻找到合适的位置并读取下一个块。这样,每个消费者都接收到自己的StreamProxy实例,并且可以在独立的位置从内部流读取数据。

我看不出这种方法比最初提议的StreamProxy有任何优势,除非你的消费者是并行运行的。如果是,你还需要一个同步机制在StreamProxy中,这样读不会重叠,因为内部流一次只能在一个位置。这有效地序列化了消费者(这是从单个Stream开始并且不将其内容复制到内存中的固有限制),并且使该方法总体上效率较低(除非内部流的读性能和消费者的写性能之间存在巨大差异)。

新的read方法可能看起来像:

public override int Read(byte[] buffer, int offset, int count)
{
    lock(_stream)
    {
        //position the inner stream to end of last read (another consumer may have moved it)
        _stream.Seek(Position, SeekOrigin.Begin);
        //read the bytes, up to count
        var count = _stream.Read(buffer, offset, count);
        //update the next read position
        Position += count;
        return count;
    }
}
public override long Position{get;set;}

Stream.CopyTo内部有一个缓冲区(81920字节,如果您没有在过载方法上设置它)。它的实现非常简单,所以你可以改变它,像这样使用它:

void ConsumeStream(Stream source, Stream destination, int bufferSize)
{
    byte[] buffer = new byte[bufferSize];
    int count;
    while ((count = source.Read(buffer, 0, buffer.Length)) != 0)
    {
        destination.Write(buffer, 0, count);
        //Other stuff
    }
}