响应式框架使用BlockingCollection作为消息队列
本文关键字:消息 队列 BlockingCollection 框架 响应 | 更新日期: 2023-09-27 18:07:11
我最近一直在做一些关于响应式框架的工作,到目前为止我非常喜欢它。我正在考虑用一些经过过滤的IObservables来替换传统的轮询消息队列,以清理服务器操作。在旧的方法中,我像这样处理进入服务器的消息:
// Start spinning the process message loop
Task.Factory.StartNew(() =>
{
while (true)
{
Command command = m_CommandQueue.Take();
ProcessMessage(command);
}
}, TaskCreationOptions.LongRunning);
这会导致一个连续的轮询线程,该线程将来自客户端的命令委托给ProcessMessage方法,其中我有一系列if/else-if语句,这些语句确定命令的类型并根据其类型委托工作
我用一个事件驱动的系统来代替它,我已经写了下面的代码:
private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
private IObservable<BesiegedMessage> m_MessagePublisher;
m_MessagePublisher = m_MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default);
// All generic Server messages (containing no properties) will be processed here
IDisposable genericServerMessageSubscriber = m_MessagePublisher
.Where(message => message is GenericServerMessage)
.Subscribe(message =>
{
// do something with the generic server message here
}
我的问题是,虽然这是有效的,这是一个很好的做法,使用一个阻塞集合作为支持IObservable这样吗?我不知道Take()在哪里被以这种方式调用,这使我认为消息将在处理后不被删除而堆积在队列上?
将subject作为支持集合来驱动将拾取这些消息的过滤IObservables会更有效吗?我在这里还遗漏了什么可能对这个系统架构有益的东西吗?
下面是一个完整的工作示例,在Visual Studio 2012下进行了测试。
- 创建一个新的c#控制台应用。右键单击你的项目,选择"Manage NuGet Packages",然后添加"Reactive Extensions - Main"库"。
添加c#代码:
using System;
using System.Collections.Concurrent;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace DemoRX
{
class Program
{
static void Main(string[] args)
{
BlockingCollection<string> myQueue = new BlockingCollection<string>();
{
IObservable<string> ob = myQueue.
GetConsumingEnumerable().
ToObservable(TaskPoolScheduler.Default);
ob.Subscribe(p =>
{
// This handler will get called whenever
// anything appears on myQueue in the future.
Console.Write("Consuming: {0}'n",p);
});
}
// Now, adding items to myQueue will trigger the item to be consumed
// in the predefined handler.
myQueue.Add("a");
myQueue.Add("b");
myQueue.Add("c");
Console.Write("[any key to exit]'n");
Console.ReadKey();
}
}
}
您将在控制台上看到如下内容:
[any key to exit]
Consuming: a
Consuming: b
Consuming: c
使用RX的真正好处是你可以使用LINQ的全部功能来过滤掉任何不想要的消息。例如,添加一个.Where
子句来通过"a"进行过滤,并观察发生了什么:
ob.Where(o => (o == "a")).Subscribe(p =>
{
// This will get called whenever something appears on myQueue.
Console.Write("Consuming: {0}'n",p);
});
哲学笔记与启动专用线程来轮询队列相比,此方法的优点是,一旦程序退出,您不必担心正确处置线程。这意味着你不需要使用IDisposable或CancellationToken(在处理BlockingCollection时总是需要的,否则你的程序可能会挂在exit上,而线程拒绝死亡)。
相信我,编写完全健壮的代码来使用来自BlockingCollection的事件并不像你想象的那么容易。我更喜欢使用RX方法,如上所示,因为它更干净,更健壮,代码更少,并且可以使用LINQ进行过滤。
延迟我对这个方法的速度感到惊讶。
在我的至强X5650 @ 2.67Ghz上,处理1000万个事件需要5秒,每个事件大约需要0.5微秒。将项目放入BlockingCollection需要4.5秒,因此RX将它们取出并处理它们的速度几乎与它们进入的速度一样快。
线程在我所有的测试中,RX只启动一个线程来处理队列上的任务。
这意味着我们有一个非常好的模式:我们可以使用RX从多个线程收集传入数据,将它们放入共享队列,然后在单个线程上处理队列内容(根据定义,这是线程安全的)。
这种模式通过一个队列将数据的生产者和消费者解耦,在处理多线程代码时消除了大量令人头痛的问题,其中生产者可以是多线程的,而消费者是单线程的,因此是线程安全的。这是使Erlang如此健壮的概念。有关此模式的更多信息,请参阅简单得可笑的多线程。
这是直接从我的后路拉出来的东西-任何真正的解决方案都将非常很大程度上取决于您的实际使用,但这里是"最便宜的伪消息队列系统":
思想/动机:
- 故意暴露
IObservable<T>
,以便订户可以进行任何过滤/交叉订阅 - 整个队列是无类型的,但
Register
和Publish
是类型安全的 - YMMV与
Publish()
在它所在的位置-尝试移动它 一般来说, - 可以将注册"内部化"以实际执行订阅,但随后队列将需要管理创建的
IDisposables
-呸,让您的消费者处理它!
Subject
是一个禁忌,尽管在这种情况下,它确实使一些简单的代码。代码:
public class TheCheapestPubSubEver
{
private Subject<object> _inner = new Subject<object>();
public IObservable<T> Register<T>()
{
return _inner.OfType<T>().Publish().RefCount();
}
public void Publish<T>(T message)
{
_inner.OnNext(message);
}
}
用法:
void Main()
{
var queue = new TheCheapestPubSubEver();
var ofString = queue.Register<string>();
var ofInt = queue.Register<int>();
using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
{
queue.Publish("Foo");
queue.Publish(1);
Console.ReadLine();
}
}
输出:A string! Foo
An int! 1
然而,这并没有严格强制"消费消费者"——一个特定类型的多个寄存器会导致多个观察者调用——也就是说:
var queue = new TheCheapestPubSubEver();
var ofString = queue.Register<string>();
var anotherOfString = queue.Register<string>();
var ofInt = queue.Register<int>();
using(ofInt.Subscribe(i => Console.WriteLine("An int! {0}", i)))
using(ofString.Subscribe(s => Console.WriteLine("A string! {0}", s)))
using(anotherOfString.Subscribe(s => Console.WriteLine("Another string! {0}", s)))
{
queue.Publish("Foo");
queue.Publish(1);
Console.ReadLine();
}
结果:
A string! Foo
Another string! Foo
An int! 1
我没有在这种情况下使用BlockingCollection
-所以我是'猜测' -你应该运行它来批准,反驳。
BlockingCollection
可能只会使这里的事情进一步复杂化(或者提供很少的帮助)。看看乔恩的这篇文章——简单地确认一下。GetConsumingEnumerable
将为您提供"每个订户"可枚举值。最终耗尽它们——这是Rx要考虑的。
同时IEnumerable<>.ToObservable
进一步使"源"变平。当它工作时(你可以查找源代码——我推荐使用w/Rx)——每个订阅都会创建一个自己的"枚举器"——所以所有订阅都会得到他们自己版本的提要。我真的不确定,在这种可观察情境中,这是如何实现的。
无论如何-如果你想提供应用程序范围的消息-在我看来,你需要引入Subject
或其他形式的状态(例如发布等)。从这个意义上说,我不认为BlockingCollection会有任何帮助——但是,你最好自己尝试一下。
注释(哲学注释)
如果你想组合消息类型,或者组合不同的来源——例如在一个更"真实"的场景中——它会变得更复杂。我必须说这很有趣。
注意让它们"扎根"到一个单一的共享流(并避免Jer正确建议的)。
我建议你不要试图逃避使用Subject
。对于你所需要的,那就是你的朋友——不管所有与状态无关的讨论以及Subject有多糟糕——你实际上拥有了一个状态(你需要一个"状态")——Rx在"事后"启动,所以你可以享受它的好处。
我鼓励你这样做,因为我喜欢它的结果。
我这里的问题是,我们已经把一个队列(我通常与一个消费者的破坏性读取相关联,特别是如果你正在使用BlockingCollection)变成一个广播(发送给任何人和每个人现在听)。
这似乎是两个相互矛盾的想法。
我见过这样做,但后来被扔掉了,因为它是"错误问题的正确解决方案"。