在c#中发送/接收GZip压缩的MSMQ消息
本文关键字:压缩 GZip MSMQ 消息 接收 | 更新日期: 2023-09-27 18:06:48
我正在尝试将大对象(>30MB)发送到MSMQ队列。由于我们正在尝试发送大量的数据,我们的想法是在发送之前对对象进行GZip压缩,然后在接收端解压缩。
然而,将压缩流写入message.BodyStream
属性似乎可以工作,但不能从那里读取它。我不知道怎么了。
Message l_QueueMessage = new Message();
l_QueueMessage.Priority = priority;
using (MessageQueue l_Queue = CreateQueue())
{
GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Compress);
Formatter.Serialize(stream, message);
l_Queue.Send(l_QueueMessage);
}
Formatter是BinaryFormatter类型的全局属性。这用于序列化/反序列化到我们想要发送/接收的对象类型。"ProductItem。"
接收端是这样的:
GZipStream stream = new GZipStream(l_Message.BodyStream, CompressionMode.Decompress);
object decompressedObject = Formatter.Deserialize(stream);
ProductItem l_Item = decompressedObject as ProductItem;
m_ProductReceived(sender, new MessageReceivedEventArgs<ProductItem>(l_Item));
l_ProductQueue.BeginReceive();
我得到一个试图反序列化的EndOfStreamException "{"Unable to read beyond the end of the stream."}
在System.IO.BinaryReader.ReadByte ()
使用messageBodyStream属性,我实际上绕过了消息。Formatter,我没有初始化为任何东西,因为我在GZipStream中使用我自己的服务器/服务器机制。然而,我不确定这样做是否正确。
我错过了什么?谢谢!
在您的原始代码中,问题是您需要关闭GZipStream
以便正确编写GZip页脚,只有这样您才能发送它。如果你不这样做,你最终会发送无法反序列化的字节。这也是为什么稍后完成发送的新代码可以正常工作的原因。
好了,我成功了。关键是将接收器上的解压缩流转换为字节[]数组。然后反序列化开始工作。
发送方代码(注意在发送消息之前流是关闭的):
using (MessageQueue l_Queue = CreateQueue())
{
using (GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Compress, true))
{
Formatter.Serialize(stream, message);
}
l_Queue.Send(l_QueueMessage);
}
接收端(注意我是如何将流转换为字节[]然后反序列化的):
using (GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Decompress))
{
byte[] bytes = ReadFully(stream);
using (MemoryStream ms = new MemoryStream(bytes))
{
decompressedObject = Formatter.Deserialize(ms);
}
}
仍然,不知道为什么使用ReadFully()函数而不是Stream.CopyTo()可以工作。有人吗?
顺便说一句,ReadFully()是一个从流中创建byte[]的函数。我要感谢Jon Skeet在http://www.yoda.arachsys.com/csharp/readbinary.html上的贡献。谢谢!
尝试将压缩和发送分开:
byte[] binaryBuffer = null;
using (MemoryStream compressedBody = new MemoryStream())
{
using(GZipStream stream = new GZipStream(compressedBody, CompressionMode.Compress))
{
Formatter.Serialize(compressedBody, message);
binaryBuffer = compressedBody.GetBuffer();
}
}
using (MessageQueue l_Queue = CreateQueue())
{
l_QueueMessage.BodyStream.Write(binaryBuffer, 0, binaryBuffer.Length);
l_QueueMessage.BodyStream.Seek(0, SeekOrigin.Begin);
l_Queue.Send(l_QueueMessage);
}