WCF MSMQ 侦听器服务删除消息,但不处理

本文关键字:处理 消息 删除 MSMQ 侦听器 服务 WCF | 更新日期: 2023-09-27 18:34:31

我有一个WCF服务,它在MSMQ中创建一条私人消息。消息创建正常,我可以看到客户端应用程序正在创建的消息。

我还有一个似乎运行良好的 MSMQ 侦听器服务。当我启动服务时,它似乎成功"处理"了队列中的消息并将它们删除。但是,我的侦听器服务的实现似乎没有得到执行。

我对 MSMQ 相当陌生,我不知道为什么从队列中删除消息,以及为什么我的侦听器方法中的代码没有被执行。

以下是我的服务课程...

[ServiceContract(Namespace = ServiceConstants.NAMESPACE, Name = "IOrderService")]
public interface IOrderQueueProcessingService
{
    [OperationContract(IsOneWay = true, Action = "*")]
    void ProcessOrderQueue(MsmqMessage<string> message);
}
public abstract class OrderQueueProcessingServiceBase : ServiceBase, IOrderQueueProcessingService
{
    #region CONSTRUCTORS
    protected OrderQueueProcessingServiceBase() { }
    protected OrderQueueProcessingServiceBase(List<EventRecord> existingList) : base(existingList) { }
    #endregion //CONSTRUCTORS
    #region IOrderQueueProcessingService Members

    [OperationBehavior(TransactionScopeRequired = false, TransactionAutoComplete = true)]
    public virtual void ProcessOrderQueue(MsmqMessage<string> message)
    {
        throw new NotImplementedException();
    }
    #endregion
}
public class OrderQueueProcessingService : OrderQueueProcessingServiceBase
{
    #region Constructors

    public OrderQueueProcessingService() {}
    public OrderQueueProcessingService(List<EventRecord> existingList) : base(existingList) { }

    #endregion
    /// <summary>
    ///     Processes any Orders in the Orders Queue
    /// </summary>
    /// <param name="message"></param>
    public override void ProcessOrderQueue(MsmqMessage<string> message)
    {
        var q = new MessageQueue(@".'private$'msmqdemo/submitorderservice.svc");
        q.Send("hey");
        /*
        using (new Tracer("OrderQueueProcessingService"))
        {
            // add data context to work with.
            using (var unitOfWork = new TLFDataContext())
            {
                var newOrderLines = new List<OrderLineDataContract>
                                {
                                    new OrderLineDataContract
                                        {
                                            C = "test",
                                            IC = "msw",
                                            Qty = 1,
                                            T = "volume" ,
                                            ED = DateTime.UtcNow.AddDays(5)
                                        }
                                };
                var newOrder = new OrderDataContract
                {
                    LIs = newOrderLines.AsEnumerable(),
                    PId = 9323,
                    POId = 8686,
                    S = "new"
                };
                var orderService = new OrderService();
                var createdOrder = orderService.CreateOrder(null, null, newOrder);
                //unitOfWork.SubmitUnitOfWork();
                //return null;
            }
        }*/

    }
}

注释掉了我最终尝试执行的代码,并将其替换为简单的 MSMQ 消息发送,以进行测试。这似乎应该可以正常工作。任何帮助将不胜感激。

配置设置如下...

<service name="ServiceImplementation.OrderQueueProcessingService">
    <host>
      <baseAddresses>
        <add baseAddress="https://localhost:9000/OrderQueueProcessingService.svc" />
      </baseAddresses>
    </host>
    <endpoint address="net.msmq://localhost/private/testingqueue/OrderQueueProcessingService.svc" binding="netMsmqBinding" bindingConfiguration="MsmqBindingNonTransactionalNoSecurity" contract="IOrderQueueProcessingService" />
  </service>

WCF MSMQ 侦听器服务删除消息,但不处理

我能够弄清楚我的问题。我没有对 QueueOnPeekComplted 事件进行任何事件处理。我将其添加到 Initialize 方法中,并且能够成功处理我的消息。我还添加了对无法处理的消息的处理。下面是我的 OrderQueueProcessingService 的新实现。

public class OrderQueueProcessingService : OrderQueueProcessingServiceBase, IDisposable
{
    #region Constructors
    public OrderQueueProcessingService()
    {
        Initialize(ConfigurationManager.AppSettings["OrderQueueProcessingQueueName"]);
    }
    public OrderQueueProcessingService(List<EventRecord> existingList) : base(existingList) {}
    #endregion
    #region Properties
    private MessageQueue Queue { get; set; }
    #endregion
    #region IDisposable Members
    public new void Dispose()
    {
        if (Queue == null) return;
        //unsubscribe and dispose
        Queue.PeekCompleted -= QueueOnPeekCompleted;
        Queue.Dispose();
    }
    #endregion
    private void Initialize(string queueName)
    {
        Queue = new MessageQueue(queueName, false, true, QueueAccessMode.Receive)
                    {
                            Formatter = new XmlMessageFormatter(new[] {typeof (OrderQueueDataContract)})
                    };
        //setup events and start.
        Queue.PeekCompleted += QueueOnPeekCompleted;
        Queue.BeginPeek();
    }
    private static void MoveMessageToDeadLetter(IDisposable message)
    {
        var q = new MessageQueue(ConfigurationManager.AppSettings["OrderProcessingDLQ"], QueueAccessMode.Send)
                    {
                            Formatter = new XmlMessageFormatter(new[] {typeof (OrderQueueDataContract)})
                    };
        q.Send(message, MessageQueueTransactionType.Single);
        q.Dispose();
    }
    /// <summary>
    ///     Processes the specified Order message
    /// </summary>
    /// <param name="orderMessage"></param>
    public override void ProcessOrderQueue(OrderQueueDataContract orderMessage)
    {
        using (var unitOfWork = new MyDataContext())
        {
            switch (orderMessage.M.ToLower())
            {
                case "create":
                    DataAccessLayer.CreateOrder(unitOfWork, orderMessage.O.TranslateToBe());
                    break;
                default:
                    break;
            }
        }
    }
    private void QueueOnPeekCompleted(object sender, PeekCompletedEventArgs peekCompletedEventArgs)
    {
        var asyncQueue = (MessageQueue) sender;
        using (var transaction = new MessageQueueTransaction())
        {
            transaction.Begin();
            try
            {
                using (var message = asyncQueue.ReceiveById(peekCompletedEventArgs.Message.Id, TimeSpan.FromSeconds(30), transaction))
                {
                    if (message != null) ProcessOrderQueue((OrderQueueDataContract) message.Body);
                }
            }
            catch (InvalidOperationException ex)
            {
                transaction.Abort();
            }
            catch (Exception ex)
            {
                transaction.Abort();
            }
            if (transaction.Status != MessageQueueTransactionStatus.Aborted) transaction.Commit();
            else
            {
                using (var message = asyncQueue.ReceiveById(peekCompletedEventArgs.Message.Id, TimeSpan.FromSeconds(30), transaction))
                {
                    if (message != null)
                    {
                        MoveMessageToDeadLetter(message);
                        message.Dispose();
                    }
                }
                EventLog.WriteEntry("OrderQueueProcessingService", "Could not process message: " + peekCompletedEventArgs.Message.Id);
            }
            transaction.Dispose();
        }
        asyncQueue.EndPeek(peekCompletedEventArgs.AsyncResult);
        asyncQueue.BeginPeek();
    }
}

有人看到此实现有任何问题吗?我不得不摆弄它很多,但它已经通过了单元测试和过程测试。

我注意到的一件事是,当我的服务第一次启动时,它会处理队列中的所有消息;这可能会阻碍我的其他服务的启动。这可能只是我在测试时在控制台应用中启动它们时的问题。为了安全起见,我最后启动这项服务。