MSMQ ReceiveById failing
本文关键字:failing ReceiveById MSMQ | 更新日期: 2023-09-27 18:09:22
我有以下代码
public class MsmqQueueProvider : IQueueProvider
{
public void VeryfyIsAvailable(string name)
{
var queueAddress = string.Format(@" .'private$'{0}", name);
var message = "There was a problem while starting the NEasyMessaging.";
if (MessageQueue.Exists(queueAddress))
{
using (var queue = new MessageQueue(queueAddress))
{
if (queue.CanWrite && queue.CanRead) return;
if (queue.CanRead == false)
{
message += string.Format("Queue {0} is reachable but not readable", queueAddress);
throw new QueueProviderProviderException(message);
}
message += string.Format("Queue {0} is reachable but not writable", queueAddress);
throw new QueueProviderProviderException(message);
}
}
message += string.Format("Queue {0} cannot be found", queueAddress);
throw new QueueProviderProviderException(message);
}
public QueueMessage Peek(string queueName)
{
var queue = new MessageQueue(string.Format(@" .'private$'{0}", queueName), QueueAccessMode.Peek);
var message = queue.Peek();
// ReSharper disable once PossibleNullReferenceException
return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
}
public QueueMessage Receive(string queueName)
{
var queue = new MessageQueue(string.Format(@" .'private$'{0}", queueName), QueueAccessMode.Receive);
var message = queue.Receive(MessageQueueTransactionType.Automatic);
// ReSharper disable once PossibleNullReferenceException
return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
}
public QueueMessage ReceiveById(string queueName, string messageId)
{
var queue = new MessageQueue(string.Format(@" .'private$'{0}", queueName), QueueAccessMode.Receive);
var message = queue.ReceiveById(messageId, MessageQueueTransactionType.Automatic);
// ReSharper disable once PossibleNullReferenceException
return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
}
public void QueueMessage(string messageContent, string messageName, string queueName)
{
var queueAddress = string.Format(@" .'private$'{0}", queueName);
using (var streamReader = new StringReader(messageContent))
{
var message = new Message
{
TimeToBeReceived = Message.InfiniteTimeout,
TimeToReachQueue = Message.InfiniteTimeout,
Label = messageName,
UseAuthentication = false,
Recoverable = true
};
using (var queue = new MessageQueue(queueAddress, QueueAccessMode.Send))
{
using (var streamWriter = new StreamWriter(message.BodyStream))
{
streamWriter.Write(streamReader.ReadToEnd());
streamWriter.Flush();
queue.Send(message, MessageQueueTransactionType.Automatic);
}
queue.Close();
}
}
}
}
public class UnitOfWork : IUnitOfWork
{
private TransactionScope _transaction;
public void Start()
{
var transactionOptions = new TransactionOptions
{
Timeout = TransactionManager.MaximumTimeout
};
_transaction = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions);
}
public void CompletedWithSuccess()
{
if (Transaction.Current.TransactionInformation.Status == TransactionStatus.Active)
{
_transaction.Complete();
}
_transaction.Dispose();
}
public void CompletedWithFail()
{
_transaction.Dispose();
}
}
public sealed partial class Service : ServiceBase
{
private readonly ILog _log = LogManager.GetLogger(typeof(Service));
private readonly ManualResetEvent _shutdownEvent = new ManualResetEvent(false);
private Thread _workerThread;
private IQueueProvider _queueProvider;
private IEndpointConfiguration _configuration;
private IContainer _container;
public Service()
{
InitializeComponent();
ServiceName = "";
EventLog.Log = "";
}
public void Init()
{
var endpointBootstrap = new EndpointBootstrap();
endpointBootstrap.Initialize();
_container = endpointBootstrap.IocContainer;
_queueProvider = _container.Resolve<IQueueProvider>();
_configuration = _container.Resolve<IEndpointConfiguration>();
_workerThread = new Thread(DoWork) { Name = "Worker Thread", IsBackground = true };
_workerThread.Start();
}
protected override void OnStart(string[] args)
{
Init();
}
protected override void OnStop()
{
_shutdownEvent.Set();
if (!_workerThread.Join(3000))
{
_workerThread.Abort();
}
}
private void DoWork()
{
while (!_shutdownEvent.WaitOne(0))
{
var queueMessage = _queueProvider.Peek(_configuration.QueueName);
try
{
ProcessMessage(queueMessage);
}
catch (Exception ex)
{
_log.Error(ex);
MoveMessageToErrorQueue(queueMessage.Id);
}
}
}
private void ProcessMessage(QueueMessage message)
{
using (var dependencyScope = _container.BeginLifetimeScope())
{
var unitOfWork = dependencyScope.Resolve<IUnitOfWork>();
unitOfWork.Start();
var messageProcessor = new MessageProcessor(dependencyScope);
try
{
messageProcessor.HandleMessage(message);
_queueProvider.ReceiveById(_configuration.QueueName, message.Id);
}
catch (Exception ex)
{
_log.Error(ex);
unitOfWork.CompletedWithFail();
throw;
}
unitOfWork.CompletedWithSuccess();
}
}
private void MoveMessageToErrorQueue(string messageId)
{
try
{
using (var dependencyScope = _container.BeginLifetimeScope())
{
var unitOfWork = dependencyScope.Resolve<IUnitOfWork>();
unitOfWork.Start();
var message = _queueProvider.ReceiveById(_configuration.QueueName, messageId);
try
{
_queueProvider.QueueMessage(message.Body, message.Name, _configuration.QueueErrorName);
unitOfWork.CompletedWithSuccess();
}
catch
{
unitOfWork.CompletedWithFail();
throw;
}
}
}
catch (Exception ex)
{
_log.Error(ex);
}
}
}
基本上我的想法很简单,至少在纸面上是这样。消息从队列中取出,在开发机器上一切正常,问题是当我们将代码部署到服务器(Windows 2008)时。如果消息没有得到正确处理,我们将消息从队列中移除并将其放入错误队列,问题是GetById方法无法找到消息:
private void MoveMessageToErrorQueue(string messageId)
var message = _queueProvider.ReceiveById(_configuration.QueueName, messageId);
它在开发盒上工作得很好,但我们只能找到一种方法来解决这个问题。
欢迎任何帮助。
感谢更新
跟随Paul的评论:
嗨,Paul,谢谢你的帮助。不幸的是,除非我理解错了什么,甚至不开始接收将做。现在我从队列中选择一条消息,由于只有一个线程从该队列中读取消息,如果稍后通过id接收消息,那么消息仍然存在似乎是合乎逻辑的。所以为什么我认为我需要按照他们的方式去做。我查看消息,然后创建一个事务范围并进行任何处理,当然,在执行期间创建的任何sql server会话都将注册该事务。如果在消息处理期间出现错误,我需要回滚对数据库所做的更改,为此我回滚事务,但我还需要将失败消息放入错误队列中。我不能只在一个事务中,从队列中删除消息,尝试处理消息,如果失败,则将其放入错误队列,因为我仍然需要回滚数据库更改。而不是Peek,然后尝试通过id等获取消息,为什么不使用BeginReceive/Receive或类似的方法-这样你已经有消息了。
只要确保你设置队列属性包括消息体(不记得这是否是接收等的默认值)
MSDN -消息队列。BeginReceive方法MSDN -消息队列。接收方法
(编辑…)
如果你想更好地看到你的消息等尝试下载MSMQ检查器(是的,这是我的工具,我创建了它为这些类型的场景,即发生了什么?!)
如果你使用它,打开"恒定窥视模式",你应该看到按摩进来。我还会为你正在使用的队列启用日志,这样当消息被处理时,你就可以在日志中看到它们等等。也许可以仔细检查消息id是否在你不期望的时候被修改。此外,不同的操作系统和MSMQ运行时间和设置可能会导致不同的行为……如果不运行代码很难说(从你发布的内容来看,我们可能可以!!)
PK: -)