响应式框架使用BlockingCollection作为消息队列

本文关键字:消息 队列 BlockingCollection 框架 响应 | 更新日期: 2023-09-27 18:07:11

我最近一直在做一些关于响应式框架的工作,到目前为止我非常喜欢它。我正在考虑用一些经过过滤的IObservables来替换传统的轮询消息队列,以清理服务器操作。在旧的方法中,我像这样处理进入服务器的消息:

// Start spinning the process message loop
   Task.Factory.StartNew(() =>
   {
       while (true)
       {
           Command command = m_CommandQueue.Take();
           ProcessMessage(command);
       }
   }, TaskCreationOptions.LongRunning);

这会导致一个连续的轮询线程,该线程将来自客户端的命令委托给ProcessMessage方法,其中我有一系列if/else-if语句,这些语句确定命令的类型并根据其类型委托工作

我用一个事件驱动的系统来代替它,我已经写了下面的代码:

 private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
 private IObservable<BesiegedMessage> m_MessagePublisher;
 m_MessagePublisher = m_MessageQueue
       .GetConsumingEnumerable()
       .ToObservable(TaskPoolScheduler.Default);
        // All generic Server messages (containing no properties) will be processed here
 IDisposable genericServerMessageSubscriber = m_MessagePublisher
       .Where(message => message is GenericServerMessage)
       .Subscribe(message =>
       {
           // do something with the generic server message here
       }

我的问题是,虽然这是有效的,这是一个很好的做法,使用一个阻塞集合作为支持IObservable这样吗?我不知道Take()在哪里被以这种方式调用,这使我认为消息将在处理后不被删除而堆积在队列上?

将subject作为支持集合来驱动将拾取这些消息的过滤IObservables会更有效吗?我在这里还遗漏了什么可能对这个系统架构有益的东西吗?

响应式框架使用BlockingCollection作为消息队列

下面是一个完整的工作示例,在Visual Studio 2012下进行了测试。

  1. 创建一个新的c#控制台应用。右键单击你的项目,选择"Manage NuGet Packages",然后添加"Reactive Extensions - Main"库"。

添加c#代码:

using System;
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace DemoRX
{
    class Program
    {
        static void Main(string[] args)
        {
            BlockingCollection<string> myQueue = new BlockingCollection<string>();
            {                
                IObservable<string> ob = myQueue.
                  GetConsumingEnumerable().
                  ToObservable(TaskPoolScheduler.Default);
                ob.Subscribe(p =>
                {
                    // This handler will get called whenever 
                    // anything appears on myQueue in the future.
                    Console.Write("Consuming: {0}'n",p);                    
                });
            }
            // Now, adding items to myQueue will trigger the item to be consumed
            // in the predefined handler.
            myQueue.Add("a");
            myQueue.Add("b");
            myQueue.Add("c");           
            Console.Write("[any key to exit]'n");
            Console.ReadKey();
        }
    }
}

您将在控制台上看到如下内容:

[any key to exit]
Consuming: a
Consuming: b
Consuming: c

使用RX的真正好处是你可以使用LINQ的全部功能来过滤掉任何不想要的消息。例如,添加一个.Where子句来通过"a"进行过滤,并观察发生了什么:

ob.Where(o => (o == "a")).Subscribe(p =>
{
    // This will get called whenever something appears on myQueue.
    Console.Write("Consuming: {0}'n",p);                    
});

哲学笔记

与启动专用线程来轮询队列相比,此方法的优点是,一旦程序退出,您不必担心正确处置线程。这意味着你不需要使用IDisposable或CancellationToken(在处理BlockingCollection时总是需要的,否则你的程序可能会挂在exit上,而线程拒绝死亡)。

相信我,编写完全健壮的代码来使用来自BlockingCollection的事件并不像你想象的那么容易。我更喜欢使用RX方法,如上所示,因为它更干净,更健壮,代码更少,并且可以使用LINQ进行过滤。

延迟

我对这个方法的速度感到惊讶。

在我的至强X5650 @ 2.67Ghz上,处理1000万个事件需要5秒,每个事件大约需要0.5微秒。将项目放入BlockingCollection需要4.5秒,因此RX将它们取出并处理它们的速度几乎与它们进入的速度一样快。

线程

在我所有的测试中,RX只启动一个线程来处理队列上的任务。

这意味着我们有一个非常好的模式:我们可以使用RX从多个线程收集传入数据,将它们放入共享队列,然后在单个线程上处理队列内容(根据定义,这是线程安全的)。

这种模式通过一个队列将数据的生产者和消费者解耦,在处理多线程代码时消除了大量令人头痛的问题,其中生产者可以是多线程的,而消费者是单线程的,因此是线程安全的。这是使Erlang如此健壮的概念。有关此模式的更多信息,请参阅简单得可笑的多线程。

这是直接从我的后路拉出来的东西-任何真正的解决方案都将非常很大程度上取决于您的实际使用,但这里是"最便宜的伪消息队列系统":

思想/动机:

  • 故意暴露IObservable<T>,以便订户可以进行任何过滤/交叉订阅
  • 整个队列是无类型的,但RegisterPublish是类型安全的
  • YMMV与Publish()在它所在的位置-尝试移动它
  • 一般来说,Subject是一个禁忌,尽管在这种情况下,它确实使一些简单的代码。
  • 可以将注册"内部化"以实际执行订阅,但随后队列将需要管理创建的IDisposables -呸,让您的消费者处理它!

代码:

public class TheCheapestPubSubEver
{    
    private Subject<object> _inner = new Subject<object>();
    public IObservable<T> Register<T>()
    {
        return _inner.OfType<T>().Publish().RefCount();
    }
    public void Publish<T>(T message)
    {
        _inner.OnNext(message);
    }
}

用法:

void Main()
{
    var queue = new TheCheapestPubSubEver();
    var ofString = queue.Register<string>();
    var ofInt = queue.Register<int>();
    using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
    using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
    {
        queue.Publish("Foo");
        queue.Publish(1);
        Console.ReadLine();
    }
}
输出:

A string! Foo
An int! 1

然而,这并没有严格强制"消费消费者"——一个特定类型的多个寄存器会导致多个观察者调用——也就是说:

var queue = new TheCheapestPubSubEver();
var ofString = queue.Register<string>();
var anotherOfString = queue.Register<string>();
var ofInt = queue.Register<int>();
using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
using(anotherOfString.Subscribe(s => Console.WriteLine("Another string! {0}", s)))
{
    queue.Publish("Foo");
    queue.Publish(1);
    Console.ReadLine();
}

结果:

A string! Foo
Another string! Foo
An int! 1

我没有在这种情况下使用BlockingCollection -所以我是'猜测' -你应该运行它来批准,反驳。

BlockingCollection可能只会使这里的事情进一步复杂化(或者提供很少的帮助)。看看乔恩的这篇文章——简单地确认一下。GetConsumingEnumerable将为您提供"每个订户"可枚举值。最终耗尽它们——这是Rx要考虑的。

同时IEnumerable<>.ToObservable进一步使"源"变平。当它工作时(你可以查找源代码——我推荐使用w/Rx)——每个订阅都会创建一个自己的"枚举器"——所以所有订阅都会得到他们自己版本的提要。我真的不确定,在这种可观察情境中,这是如何实现的。

无论如何-如果你想提供应用程序范围的消息-在我看来,你需要引入Subject或其他形式的状态(例如发布等)。从这个意义上说,我不认为BlockingCollection会有任何帮助——但是,你最好自己尝试一下。

注释(哲学注释)

如果你想组合消息类型,或者组合不同的来源——例如在一个更"真实"的场景中——它会变得更复杂。我必须说这很有趣。

注意让它们"扎根"到一个单一的共享流(并避免Jer正确建议的)。

我建议你不要试图逃避使用Subject。对于你所需要的,那就是你的朋友——不管所有与状态无关的讨论以及Subject有多糟糕——你实际上拥有了一个状态(你需要一个"状态")——Rx在"事后"启动,所以你可以享受它的好处。

我鼓励你这样做,因为我喜欢它的结果。

我这里的问题是,我们已经把一个队列(我通常与一个消费者的破坏性读取相关联,特别是如果你正在使用BlockingCollection)变成一个广播(发送给任何人和每个人现在听)。

这似乎是两个相互矛盾的想法。

我见过这样做,但后来被扔掉了,因为它是"错误问题的正确解决方案"。