MSMQ ReceiveById failing

本文关键字:failing ReceiveById MSMQ | 更新日期: 2023-09-27 18:09:22

我有以下代码

public class MsmqQueueProvider : IQueueProvider
{
    public void VeryfyIsAvailable(string name)
    {
        var queueAddress = string.Format(@" .'private$'{0}", name);
        var message = "There was a problem while starting the NEasyMessaging.";
        if (MessageQueue.Exists(queueAddress))
        {
            using (var queue = new MessageQueue(queueAddress))
            {
                if (queue.CanWrite && queue.CanRead) return;
                if (queue.CanRead == false)
                {
                    message += string.Format("Queue {0} is reachable but not readable", queueAddress);
                    throw new QueueProviderProviderException(message);
                }
                message += string.Format("Queue {0} is reachable but not writable", queueAddress);
                throw new QueueProviderProviderException(message);
            }
        }
        message += string.Format("Queue {0} cannot be found", queueAddress);
        throw new QueueProviderProviderException(message);
    }
    public QueueMessage Peek(string queueName)
    {
        var queue = new MessageQueue(string.Format(@" .'private$'{0}", queueName), QueueAccessMode.Peek);
        var message = queue.Peek();
        // ReSharper disable once PossibleNullReferenceException
        return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
    }
    public QueueMessage Receive(string queueName)
    {
        var queue = new MessageQueue(string.Format(@" .'private$'{0}", queueName), QueueAccessMode.Receive);
        var message = queue.Receive(MessageQueueTransactionType.Automatic);
        // ReSharper disable once PossibleNullReferenceException
        return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
    }
    public QueueMessage ReceiveById(string queueName, string messageId)
    {
        var queue = new MessageQueue(string.Format(@" .'private$'{0}", queueName), QueueAccessMode.Receive);
        var message = queue.ReceiveById(messageId, MessageQueueTransactionType.Automatic);
        // ReSharper disable once PossibleNullReferenceException
        return new QueueMessage(message.Id, message.Label, new StreamReader(message.BodyStream).ReadToEnd());
    }
    public void QueueMessage(string messageContent, string messageName, string queueName)
    {
        var queueAddress = string.Format(@" .'private$'{0}", queueName);
        using (var streamReader = new StringReader(messageContent))
        {
            var message = new Message
            {
                TimeToBeReceived = Message.InfiniteTimeout,
                TimeToReachQueue = Message.InfiniteTimeout,
                Label = messageName,
                UseAuthentication = false,
                Recoverable = true
            };
            using (var queue = new MessageQueue(queueAddress, QueueAccessMode.Send))
            {
                using (var streamWriter = new StreamWriter(message.BodyStream))
                {
                    streamWriter.Write(streamReader.ReadToEnd());
                    streamWriter.Flush();
                    queue.Send(message, MessageQueueTransactionType.Automatic);
                }
                queue.Close();
            }
        }
    }
}

public class UnitOfWork : IUnitOfWork
{
    private TransactionScope _transaction;

    public void Start()
    {
        var transactionOptions = new TransactionOptions
        {
            Timeout = TransactionManager.MaximumTimeout
        };
        _transaction = new TransactionScope(TransactionScopeOption.RequiresNew, transactionOptions);
    }
    public void CompletedWithSuccess()
    {
        if (Transaction.Current.TransactionInformation.Status == TransactionStatus.Active)
        {
            _transaction.Complete();
        }
        _transaction.Dispose();
    }
    public void CompletedWithFail()
    {
        _transaction.Dispose();
    }
}

public sealed partial class Service : ServiceBase
{
    private readonly ILog _log = LogManager.GetLogger(typeof(Service));
    private readonly ManualResetEvent _shutdownEvent = new ManualResetEvent(false);
    private Thread _workerThread;
    private IQueueProvider _queueProvider;
    private IEndpointConfiguration _configuration;
    private IContainer _container;
    public Service()
    {
        InitializeComponent();
        ServiceName = "";
        EventLog.Log = "";
    }

    public void Init()
    {
        var endpointBootstrap = new EndpointBootstrap();
        endpointBootstrap.Initialize();
        _container = endpointBootstrap.IocContainer;
        _queueProvider = _container.Resolve<IQueueProvider>();
        _configuration = _container.Resolve<IEndpointConfiguration>();
        _workerThread = new Thread(DoWork) { Name = "Worker Thread", IsBackground = true };
        _workerThread.Start();
    }
    protected override void OnStart(string[] args)
    {
        Init();
    }
    protected override void OnStop()
    {
        _shutdownEvent.Set();
        if (!_workerThread.Join(3000))
        {
            _workerThread.Abort();
        }
    }
    private void DoWork()
    {
        while (!_shutdownEvent.WaitOne(0))
        {
            var queueMessage = _queueProvider.Peek(_configuration.QueueName);
            try
            {
                ProcessMessage(queueMessage);
            }
            catch (Exception ex)
            {
                _log.Error(ex);
                MoveMessageToErrorQueue(queueMessage.Id);
            }
        }
    }
    private void ProcessMessage(QueueMessage message)
    {
        using (var dependencyScope = _container.BeginLifetimeScope())
        {
            var unitOfWork = dependencyScope.Resolve<IUnitOfWork>();
            unitOfWork.Start();
            var messageProcessor = new MessageProcessor(dependencyScope);
            try
            {
                messageProcessor.HandleMessage(message);
                _queueProvider.ReceiveById(_configuration.QueueName, message.Id);
            }
            catch (Exception ex)
            {
                _log.Error(ex);
                unitOfWork.CompletedWithFail();
                throw;
            }
            unitOfWork.CompletedWithSuccess();
        }
    }
    private void MoveMessageToErrorQueue(string messageId)
    {
        try
        {
            using (var dependencyScope = _container.BeginLifetimeScope())
            {
                var unitOfWork = dependencyScope.Resolve<IUnitOfWork>();
                unitOfWork.Start();
                var message = _queueProvider.ReceiveById(_configuration.QueueName, messageId);
                try
                {
                    _queueProvider.QueueMessage(message.Body, message.Name, _configuration.QueueErrorName);
                    unitOfWork.CompletedWithSuccess();
                }
                catch
                {
                    unitOfWork.CompletedWithFail();
                    throw;
                }
            }
        }
        catch (Exception ex)
        {
            _log.Error(ex);
        }
    }
}
基本上我的想法很简单,至少在纸面上是这样。消息从队列中取出,在开发机器上一切正常,问题是当我们将代码部署到服务器(Windows 2008)时。如果消息没有得到正确处理,我们将消息从队列中移除并将其放入错误队列,问题是GetById方法无法找到消息:
private void MoveMessageToErrorQueue(string messageId)
var message = _queueProvider.ReceiveById(_configuration.QueueName, messageId);

它在开发盒上工作得很好,但我们只能找到一种方法来解决这个问题。

欢迎任何帮助。

感谢

更新

跟随Paul的评论:

嗨,Paul,谢谢你的帮助。不幸的是,除非我理解错了什么,甚至不开始接收将做。现在我从队列中选择一条消息,由于只有一个线程从该队列中读取消息,如果稍后通过id接收消息,那么消息仍然存在似乎是合乎逻辑的。所以为什么我认为我需要按照他们的方式去做。我查看消息,然后创建一个事务范围并进行任何处理,当然,在执行期间创建的任何sql server会话都将注册该事务。如果在消息处理期间出现错误,我需要回滚对数据库所做的更改,为此我回滚事务,但我还需要将失败消息放入错误队列中。我不能只在一个事务中,从队列中删除消息,尝试处理消息,如果失败,则将其放入错误队列,因为我仍然需要回滚数据库更改。

MSMQ ReceiveById failing

而不是Peek,然后尝试通过id等获取消息,为什么不使用BeginReceive/Receive或类似的方法-这样你已经有消息了。

只要确保你设置队列属性包括消息体(不记得这是否是接收等的默认值)

MSDN -消息队列。BeginReceive方法MSDN -消息队列。接收方法

(编辑…)

如果你想更好地看到你的消息等尝试下载MSMQ检查器(是的,这是我的工具,我创建了它为这些类型的场景,即发生了什么?!)

如果你使用它,打开"恒定窥视模式",你应该看到按摩进来。我还会为你正在使用的队列启用日志,这样当消息被处理时,你就可以在日志中看到它们等等。也许可以仔细检查消息id是否在你不期望的时候被修改。此外,不同的操作系统和MSMQ运行时间和设置可能会导致不同的行为……如果不运行代码很难说(从你发布的内容来看,我们可能可以!!)

PK: -)