将流的监听器转换为IObservable
本文关键字:IObservable 转换 监听器 | 更新日期: 2023-09-27 18:14:37
我的service方法接受一个侦听器,并使用数据流调用它。我正在尝试将这个流转换为IObservable<T>
。到目前为止,我所做的是:
public class MessageListener : IMessageListener
{
private readonly Subject<string> stream = new Subject<string>();
public IObservable<string> MessageStream
{
get
{
return this.stream;
}
}
public void OnMessageAdded(string message)
{
this.stream.OnNext(message);
}
}
//Calling code
public IObservable<string> GetMessage()
{
var listener = new MessageListener();
service.Subscribe(listener);
listener.MessageStream.SubscribeOn(Scheduler.NewThread);
}
我不确定这是否足够好。我相信对SubscribeOn
的调用将只在新线程上运行订阅代码。我如何确保OnMessageAdded被一个新线程接收?
正如您已经发现的那样,SubscribeOn将只控制订阅发生的位置。但是,在您的代码中,SubscribeOn没有任何作用,因为它后面没有订阅。记住,SubscribeOn返回一个用于订阅的Observable。你不能调用"SubscribeOn"来在源代码中设置一些全局标志或类似的东西。
你真正想做的是在订阅发生之前在你的服务中调用"ObserveOn"。ObserveOn
定义了处理传入消息的线程。另一种选择是将ObserveOn
直接写入MessageListener,使其看起来像这样:
public IObservable<string> MessageStream
{
get
{
return this.stream.ObserveOn(Scheduler.ThreadPool).AsObservable();
}
}
另一件要注意的事情是,在MessageStream属性中最好调用this.stream.AsObservable()
而不是直接返回主题。