在c#中发送/接收GZip压缩的MSMQ消息

本文关键字:压缩 GZip MSMQ 消息 接收 | 更新日期: 2023-09-27 18:06:48

我正在尝试将大对象(>30MB)发送到MSMQ队列。由于我们正在尝试发送大量的数据,我们的想法是在发送之前对对象进行GZip压缩,然后在接收端解压缩。

然而,将压缩流写入message.BodyStream属性似乎可以工作,但不能从那里读取它。我不知道怎么了。

Message l_QueueMessage = new Message();
l_QueueMessage.Priority = priority;
using (MessageQueue l_Queue = CreateQueue())
{
    GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Compress);
    Formatter.Serialize(stream, message);
    l_Queue.Send(l_QueueMessage);
}

Formatter是BinaryFormatter类型的全局属性。这用于序列化/反序列化到我们想要发送/接收的对象类型。"ProductItem。"

接收端是这样的:

GZipStream stream = new GZipStream(l_Message.BodyStream, CompressionMode.Decompress);
object decompressedObject = Formatter.Deserialize(stream);
ProductItem l_Item = decompressedObject as ProductItem;
m_ProductReceived(sender, new MessageReceivedEventArgs<ProductItem>(l_Item));
l_ProductQueue.BeginReceive();

我得到一个试图反序列化的EndOfStreamException "{"Unable to read beyond the end of the stream."}在System.IO.BinaryReader.ReadByte ()

使用messageBodyStream属性,我实际上绕过了消息。Formatter,我没有初始化为任何东西,因为我在GZipStream中使用我自己的服务器/服务器机制。然而,我不确定这样做是否正确。

我错过了什么?谢谢!

在c#中发送/接收GZip压缩的MSMQ消息

在您的原始代码中,问题是您需要关闭GZipStream以便正确编写GZip页脚,只有这样您才能发送它。如果你不这样做,你最终会发送无法反序列化的字节。这也是为什么稍后完成发送的新代码可以正常工作的原因。

好了,我成功了。关键是将接收器上的解压缩流转换为字节[]数组。然后反序列化开始工作。

发送方代码(注意在发送消息之前流是关闭的):

using (MessageQueue l_Queue = CreateQueue())
{
    using (GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Compress, true))
    {
        Formatter.Serialize(stream, message);
    }
    l_Queue.Send(l_QueueMessage);
}

接收端(注意我是如何将流转换为字节[]然后反序列化的):

using (GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Decompress))
{
    byte[] bytes = ReadFully(stream);
    using (MemoryStream ms = new MemoryStream(bytes))
    {
        decompressedObject = Formatter.Deserialize(ms);
    }
}

仍然,不知道为什么使用ReadFully()函数而不是Stream.CopyTo()可以工作。有人吗?

顺便说一句,ReadFully()是一个从流中创建byte[]的函数。我要感谢Jon Skeet在http://www.yoda.arachsys.com/csharp/readbinary.html上的贡献。谢谢!

尝试将压缩和发送分开:

byte[] binaryBuffer = null;
using (MemoryStream compressedBody = new MemoryStream()) 
{
    using(GZipStream stream = new GZipStream(compressedBody, CompressionMode.Compress))
    {
        Formatter.Serialize(compressedBody, message); 
        binaryBuffer = compressedBody.GetBuffer();
    }
}
using (MessageQueue l_Queue = CreateQueue())        
{            
    l_QueueMessage.BodyStream.Write(binaryBuffer, 0, binaryBuffer.Length);
    l_QueueMessage.BodyStream.Seek(0, SeekOrigin.Begin);
    l_Queue.Send(l_QueueMessage);
}