Gzip仅在达到阈值之后

本文关键字:阈值 之后 Gzip | 更新日期: 2023-09-27 18:19:27

我需要每天归档用于构建报告的所有数据。我使用gzip压缩大部分数据,因为有些数据集可能非常大(10mb+)。我把每个单独的protobuf图写到一个文件中。我还将一组固定的已知小对象类型列入白名单,并添加了一些代码来检测文件在读取时是否使用了gzip。这是因为小文件在压缩时实际上可能比未压缩时更大。

不幸的是,仅仅由于数据的性质,我可能只有较大对象类型的几个元素,白名单方法可能会有问题。

有没有办法将对象写入流中,并且只有当它达到阈值(如8kb)时,才压缩它?我事先不知道对象的大小,有时我有一个IEnumerable<T>的对象图,它的大小可能相当大。

编辑:代码相当基本。我确实略述了将其存储在filestreamdb表中的事实。这对于实现目的来说并不重要。我删除了一些无关的代码。

public Task SerializeModel<T>(TransactionalDbContext dbConn, T Item, DateTime archiveDate, string name)
{
    var continuation = (await dbConn
        .QueryAsync<PathAndContext>(_getPathAndContext, new {archiveDate, model=name})
        .ConfigureAwait(false))
        .First();
    var useGzip = !_whitelist.Contains(typeof(T));
    using (var fs = new SqlFileStream(continuation.Path, continuation.Context, FileAccess.Write,
        FileOptions.SequentialScan | FileOptions.Asynchronous, 64*1024))
    using (var buffer = useGzip ? new GZipStream(fs, CompressionLevel.Optimal) : default(Stream))
    {
        _serializerModel.Serialize(stream ?? fs, item);
    }
    dbConn.Commit();
}

Gzip仅在达到阈值之后

在序列化过程中,您可以使用中间流来完成您所要求的内容。像这样的东西可以完成的工作

class SerializationOutputStream : Stream
{
    Stream outputStream, writeStream;
    byte[] buffer;
    int bufferedCount;
    long position;
    public SerializationOutputStream(Stream outputStream, int compressTreshold = 8 * 1024)
    {
        writeStream = this.outputStream = outputStream;
        buffer = new byte[compressTreshold];
    }
    public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); }
    public override void SetLength(long value) { throw new NotSupportedException(); }
    public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); }
    public override bool CanRead { get { return false; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return writeStream != null &&  writeStream.CanWrite; } }
    public override long Length { get { throw new NotSupportedException(); } }
    public override long Position { get { return position; } set { throw new NotSupportedException(); } }
    public override void Write(byte[] buffer, int offset, int count)
    {
        if (count <= 0) return;
        var newPosition = position + count;
        if (this.buffer == null)
            writeStream.Write(buffer, offset, count);
        else
        {
            int bufferCount = Math.Min(count, this.buffer.Length - bufferedCount);
            if (bufferCount > 0)
            {
                Array.Copy(buffer, offset, this.buffer, bufferedCount, bufferCount);
                bufferedCount += bufferCount;
            }
            int remainingCount = count - bufferCount;
            if (remainingCount > 0)
            {
                writeStream = new GZipStream(outputStream, CompressionLevel.Optimal);
                try
                {
                    writeStream.Write(this.buffer, 0, this.buffer.Length);                            
                    writeStream.Write(buffer, offset + bufferCount, remainingCount);
                }
                finally { this.buffer = null; }
            }
        }
        position = newPosition;
    }
    public override void Flush()
    {
        if (buffer == null)
            writeStream.Flush();
        else if (bufferedCount > 0)
        {
            try { outputStream.Write(buffer, 0, bufferedCount); }
            finally { buffer = null; }
        }
    }
    protected override void Dispose(bool disposing)
    {
        try
        {
            if (!disposing || writeStream == null) return;
            try { Flush(); }
            finally { writeStream.Close(); }
        }
        finally
        {
            writeStream = outputStream = null;
            buffer = null;
            base.Dispose(disposing);
        }
    }
}

并像这个一样使用它

using (var stream = new SerializationOutputStream(new SqlFileStream(continuation.Path, continuation.Context, FileAccess.Write,
        FileOptions.SequentialScan | FileOptions.Asynchronous, 64*1024)))
    _serializerModel.Serialize(stream, item);

数据集可能非常大(10mb+)

在大多数设备上,这并不是很大。在决定是否压缩之前,是否有原因不能读取整个对象?还请注意@Niklas的建议,在决定是否压缩之前,先读取一个缓冲区的数据(例如8K)。

这是因为一个小文件,当被压缩时,实际上可以比未压缩的文件大。

使小文件可能更大的因素是ZIP标头,尤其是字典。某些ZIP库允许您在压缩和解压缩时使用已知的自定义字典。多年前,我就使用过SharpZipLib。

在编码和测试方面,使用这种方法需要付出更多的努力。如果你觉得这个好处是值得的,它可能会提供最好的方法。

请注意,无论采用何种路径,都将使用存储设备块大小的倍数来物理存储数据。

如果对象是1字节或100mb,我不知道

请注意,协议缓冲区并不是真正为大型数据集设计的

协议缓冲区不是为处理大型消息而设计的。根据一般经验,如果您处理的每条消息都大于一兆字节,那么可能是时候考虑另一种策略了。

也就是说,协议缓冲区非常适合处理大型数据集中的单个消息。通常,大型数据集实际上只是小块的集合,其中每个小块都可能是结构化的数据块。

如果最大的对象可以轻松地序列化到内存中,那么首先将其序列化到MemoryStream中,然后将该MemoryStream写入最终目的地,或者通过GZipStream运行它,然后再运行到最终目的地。如果最大的对象不能轻松地序列化到内存中,我不确定该给出什么进一步的建议。