Windows服务通过msmq进行通信——我需要服务总线吗?

本文关键字:服务 总线 通信 msmq Windows | 更新日期: 2023-09-27 18:10:40

我有这样的问题,系统中包含节点(windows服务),这些节点推送要处理的消息,而其他节点则拉消息并处理它们。

这种方式的设计方式是,推送节点通过维护队列的循环列表和每次发送后轮换队列来平衡队列之间的负载。因此消息1将进入队列1,消息2将进入队列2,以此类推。到目前为止,这部分工作得很好。

在消息提取端,我们将其设计为以类似的方式检索消息——首先从队列1,然后从队列2等。理论上,每个拉节点位于不同的机器上,而在实践中,到目前为止,它只侦听单个队列。但最近的一个需求让我们在一台机器上有一个拉节点,它侦听多个队列:一个通常非常繁忙,充满了数百万条消息,另一个通常只包含少量消息。

我们面临的问题是,我们最初构建拉节点的方式是从一个队列到另一个队列,直到找到一条消息。如果超时(比如一秒钟后),它就会转移到下一个队列。

这不再工作了,因为Q1(充满了数百万条消息)将延迟大约每条消息一秒钟,因为每次从Q1提取消息后,我们将向Q2请求消息(如果它不包含任何消息,我们将等待一秒钟)。

它是这样的

Q1包含10条消息,Q2没有消息

  • 拉节点从Q1
  • 请求消息
  • Q1立即返回消息
  • 拉节点从Q1
  • 请求消息
  • ------------ 等待第二个 ------------- ( Q2是空的,请求超时)
  • 拉节点从Q1
  • 请求消息
  • Q1立即返回消息
  • 拉节点从Q1
  • 请求消息
  • ------------ 等待第二个 ------------- ( Q2是空的,请求超时)

等。

所以这显然是错误的。

我想我在这里寻找最好的建筑解决方案。消息处理不需要尽可能实时,但需要健壮,并且不应该丢失任何消息!

我想听听你对这个问题的看法。

提前感谢Yannis

Windows服务通过msmq进行通信——我需要服务总线吗?

也许您可以在MessageQueue类中使用receivecomplecompleted事件?不需要轮询。

最后创建了一组线程——每个需要处理的msmq对应一个线程。在构造函数中,我初始化了这些线程:

Storages.ForEach(queue =>
        {
            Task task = Task.Factory.StartNew(() =>
            {
                LoggingManager.LogInfo("Starting a local thread to read in mime messages from queue " + queue.Name, this.GetType());
                while (true)
                {
                    WorkItem mime = queue.WaitAndRetrieve();
                    if (mime != null)
                    {
                        _Semaphore.WaitOne();
                        _LocalStorage.Enqueue(mime);
                        lock (_locker) Monitor.Pulse(_locker);
                        LoggingManager.LogDebug("Adding no. " + _LocalStorage.Count + " item in queue", this.GetType());
                    }
                }
            });
        });
  • _LocalStorage是一个线程安全的队列实现(ConcurrentQueue在。net 4.0中引入)

  • 信号量是一个计数信号量,用于控制_LocalStorage中的插入。_LocalStorage基本上是接收消息的缓冲区,但我们不希望它在处理节点忙于工作时变得太大。结果可能是,我们检索了_LocalStorage中所有的msmq消息,但忙于处理其中的5个左右。这在弹性方面(如果程序意外终止,我们会丢失所有这些消息)和性能方面都很糟糕,因为在内存中保存所有这些项目的内存消耗将是巨大的。所以我们需要控制_LocalStorage缓冲队列中有多少项。

  • 我们脉冲线程等待工作(见下文),一个新的项目被添加到队列通过做一个简单的监视器。脉冲

将工作项从队列中取出的代码如下:

lock (_locker)
            if (_LocalStorage.Count == 0) 
                Monitor.Wait(_locker);
        WorkItem result;
        if (_LocalStorage.TryDequeue(out result))
        {
            _Semaphore.Release();
            return result;
        }
        return null;

我希望这能帮助别人解决类似的问题。