将数据库结果作为流返回

本文关键字:返回 数据库 结果 | 更新日期: 2023-09-27 17:57:25

我有一个返回数据库查询结果的函数。这些结果已经非常大了,我现在想把它们作为一个流来传递,这样客户端就可以更快地开始处理它们,并且减少内存使用。但我真的不知道如何做到这一点,下面的函数是有效的,但我想知道的是如何更改它,使它在读取第一个表时开始流式传输。

    public Stream GetResults()
    {
        IFormatter formatter = new BinaryFormatter();
        Stream stream = new MemoryStream();
        formatter.Serialize(stream, GetItemsFromTable1());
        formatter.Serialize(stream, GetItemsFromTable2());
        formatter.Serialize(stream, GetItemsFromTable3());
        formatter.Serialize(stream, GetItemsFromTable4());
        stream.Position = 0;            
        return stream;
    }

将数据库结果作为流返回

您可以编写一个自定义的Stream实现,该实现起到管道的作用。如果随后将GetItemsFromTable()方法调用转移到后台任务中,则客户端可以立即开始从流中读取结果。

在下面的解决方案中,我使用循环缓冲区作为管道流的后备存储。只有当客户端足够快地消耗数据时,内存使用才会减少。但即使在最坏的情况下,它也不应该使用比当前解决方案更多的内存。如果内存使用对您来说比执行速度更重要,那么您的流可能会阻止写调用,直到空间可用。我下面的解决方案不会阻止写入;它扩展了循环缓冲区的容量,使得后台线程可以无延迟地继续填充数据。

GetResults方法可能如下所示:

public Stream GetResults()
{
    // Begin filling the pipe with data on a background thread
    var pipeStream = new CircularBufferPipeStream();
    Task.Run(() => WriteResults(pipeStream));
    // Return pipe stream for immediate usage by client
    // Note: client is responsible for disposing of the stream after reading all data!
    return pipeStream;
}
// Runs on background thread, filling circular buffer with data
void WriteResults(CircularBufferPipeStream stream)
{
    IFormatter formatter = new BinaryFormatter();
    formatter.Serialize(stream, GetItemsFromTable1());
    formatter.Serialize(stream, GetItemsFromTable2());
    formatter.Serialize(stream, GetItemsFromTable3());
    formatter.Serialize(stream, GetItemsFromTable4());
    // Indicate that there's no more data to write
    stream.CloseWritePort();
}

以及循环缓冲流:

/// <summary>
/// Stream that acts as a pipe by supporting reading and writing simultaneously from different threads.
/// Read calls will block until data is available or the CloseWritePort() method has been called.
/// Read calls consume bytes in the circular buffer immediately so that more space is available for writes into the circular buffer.
/// Writes do not block; the capacity of the circular buffer will be expanded as needed to write the entire block of data at once.
/// </summary>
class CircularBufferPipeStream : Stream
{
    const int DefaultCapacity = 1024;
    byte[] _buffer;
    bool _writePortClosed = false;
    object _readWriteSyncRoot = new object();
    int _length;
    ManualResetEvent _dataAddedEvent;
    int _start = 0;
    public CircularBufferPipeStream(int initialCapacity = DefaultCapacity)
    {
        _buffer = new byte[initialCapacity];
        _length = 0;
        _dataAddedEvent = new ManualResetEvent(false);
    }
    public void CloseWritePort()
    {
        lock (_readWriteSyncRoot)
        {
            _writePortClosed = true;
            _dataAddedEvent.Set();
        }
    }
    public override bool CanRead { get { return true; } }
    public override bool CanWrite { get { return true; } }
    public override bool CanSeek { get { return false; } }
    public override void Flush() { }
    public override long Length { get { throw new NotImplementedException(); } }
    public override long Position
    {
        get { throw new NotImplementedException(); }
        set { throw new NotImplementedException(); }
    }
    public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); }
    public override void SetLength(long value) { throw new NotImplementedException(); }
    public override int Read(byte[] buffer, int offset, int count)
    {
        int bytesRead = 0;
        while (bytesRead == 0)
        {
            bool waitForData = false;
            lock (_readWriteSyncRoot)
            {
                if (_length != 0)
                    bytesRead = ReadDirect(buffer, offset, count);
                else if (_writePortClosed)
                    break;
                else
                {
                    _dataAddedEvent.Reset();
                    waitForData = true;
                }
            }
            if (waitForData)
                _dataAddedEvent.WaitOne();
        }
        return bytesRead;
    }
    private int ReadDirect(byte[] buffer, int offset, int count)
    {
        int readTailCount = Math.Min(Math.Min(_buffer.Length - _start, count), _length);
        Array.Copy(_buffer, _start, buffer, offset, readTailCount);
        _start += readTailCount;
        _length -= readTailCount;
        if (_start == _buffer.Length)
            _start = 0;
        int readHeadCount = Math.Min(Math.Min(_buffer.Length - _start, count - readTailCount), _length);
        if (readHeadCount > 0)
        {
            Array.Copy(_buffer, _start, buffer, offset + readTailCount, readHeadCount);
            _start += readHeadCount;
            _length -= readHeadCount;
        }
        return readTailCount + readHeadCount;
    }
    public override void Write(byte[] buffer, int offset, int count)
    {
        lock (_readWriteSyncRoot)
        {
            // expand capacity as needed
            if (count + _length > _buffer.Length)
            {
                var expandedBuffer = new byte[Math.Max(_buffer.Length * 2, count + _length)];
                _length = ReadDirect(expandedBuffer, 0, _length);
                _start = 0;
                _buffer = expandedBuffer;
            }
            int startWrite = (_start + _length) % _buffer.Length;
            int writeTailCount = Math.Min(_buffer.Length - startWrite, count);
            Array.Copy(buffer, offset, _buffer, startWrite, writeTailCount);
            startWrite += writeTailCount;
            _length += writeTailCount;
            if (startWrite == _buffer.Length)
                startWrite = 0;
            int writeHeadCount = count - writeTailCount;
            if (writeHeadCount > 0)
            {
                Array.Copy(buffer, offset + writeTailCount, _buffer, startWrite, writeHeadCount);
                _length += writeHeadCount;
            }
        }
        _dataAddedEvent.Set();
    }
    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            if (_dataAddedEvent != null)
            {
                _dataAddedEvent.Dispose();
                _dataAddedEvent = null;
            }
        }
        base.Dispose(disposing);
    }
}

尝试

public Stream GetResults()
{
    IFormatter formatter = new BinaryFormatter();
    Stream stream = new MemoryStream();
    formatter.Serialize(stream, GetItemsFromTable1());
    formatter.Serialize(stream, GetItemsFromTable2());
    formatter.Serialize(stream, GetItemsFromTable3());
    formatter.Serialize(stream, GetItemsFromTable4());
    stream.Seek(0L, SeekOrigin.Begin);
    return stream;
}

为什么会发生变化?

  • 删除using,因为流一旦离开using块就会被丢弃。处理流意味着您不能再使用它
  • 寻找溪流的起点。如果您开始从流中读取而不查找流的开头,那么您将从流的末尾开始反序列化/读取;但不幸的是,流的末尾没有内容

但是,我看不出使用MemoryStream是如何减少内存使用的。我建议将其链接到DeflateStreamFileStream以减少RAM使用

希望这对有帮助