响应式框架的多播和主题订阅问题
本文关键字:问题 框架 多播 响应 | 更新日期: 2023-09-27 18:07:20
我刚刚开始学习如何使用响应式框架,我正在努力能够多播发布到多个订阅者。
我有一切工作正常,像这样:
m_MessagePublisher = m_ServerClient.MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default);
var genericServerMessageSubscriber = m_MessagePublisher
.Where(message => message is GenericServerMessage)
.Subscribe(message =>
{
// do something here
}
但后来我意识到这不支持多播,当我试图附加另一个应该被相同消息击中的订阅者时,它不会触发。我一直在阅读。多播扩展,并试图找出主题如何发挥到这一点,但还没有能够让它工作:
var subject = new Subject<BesiegedMessage>();
var messagePublisher = m_ServerClient.MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default)
.Multicast(subject);
// All generic server messages are handled here
var genericServerMessageSubscriber = subject
.Where(message => message is GenericServerMessage)
.Subscribe(message =>
{
// do something here
}
但是现在没有一个订阅者被击中,包括一个之前工作良好的订阅者。为了能够正确地向多个订阅者进行多播,我在这里缺少什么?
更新:使用订阅(主题)而不是多播(主题)似乎正在多播工作,这让我非常困惑的是什么。多播()是
EDIT:
哈哈-我读得太快了-你问的要简单得多…也就是说,我认为下面的内容很重要,所以我就把它留下……所以,你的问题-试着添加这一行:
var messagePublisher = m_ServerClient.MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default)
.Multicast(subject)
// Here: connectable observables are a PITA...
.RefCount();
编辑:
嗯…如何描述Multicast
……我想我们还是举个例子吧:
假设你有一个这样的东西,你认为它会产生什么?
int delay = 100;
var source = Observable.Interval(TimeSpan.FromMilliseconds(delay));
var publishingFrontend = new Subject<string>();
// Here's "raw"
var rawStream = source;
using(rawStream.Subscribe(x => Console.WriteLine("{0}", x)))
{
Thread.Sleep(delay * 3);
using(rawStream.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
{
Thread.Sleep(delay * 3);
}
Thread.Sleep(delay * 5);
}
由于您订阅的是原始流,新的订阅者基本上从头开始:
(如果你重新运行,这不会100%匹配,因为我用了Thread.Sleep
的方法,但应该接近)
0
1
2
Inner: 0
3
Inner: 1
4
5
6
7
8
9
嗯…因此,如果我们想要"在中游捆绑",我们使用Publish().RefCount()
模式:
var singleSource = source.Publish().RefCount();
using(singleSource.Subscribe(x => Console.WriteLine("{0}", x)))
{
Thread.Sleep(delay * 3);
using(singleSource.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
{
Thread.Sleep(delay * 3);
}
Thread.Sleep(delay * 5);
}
产生如下内容:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9
假设我们没有Publish()
操作符——我们怎么能模拟呢?
Console.WriteLine("Simulated Publish:");
// use a subject to proxy values...
var innerSubject = new Subject<long>();
// wire up the source to "write to" the subject
var innerSub = source.Subscribe(innerSubject);
var simulatedSingleSource = Observable.Create<long>(obs =>
{
// return subscriptions to the "proxied" subject
var publishPoint = innerSubject.Subscribe(obs);
return publishPoint;
});
运行这个,我们得到:
Simulated Publish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9
哇!
但是还有另一种方法…
Console.WriteLine("MulticastPublish:");
var multicastPublish = source.Multicast(new Subject<long>()).RefCount();
using(multicastPublish.Subscribe(x => Console.WriteLine("{0}", x)))
{
Thread.Sleep(delay * 3);
using(multicastPublish.Subscribe(x => Console.WriteLine("Inner: {0}", x)))
{
Thread.Sleep(delay * 3);
}
Thread.Sleep(delay * 5);
}
输出:MulticastPublish:
0
1
2
Inner: 2
3
Inner: 3
4
Inner: 4
5
6
7
8
9
编辑:事实上,所有ConnectableObservable
生成扩展都依赖于Multicast
/Subject
配对:
Publish() => Multicast(new Subject<T>)
Replay() => Multicast(new ReplaySubject<T>)
PublishLast() => Multicast(new AsyncSubject<T>)