将流的监听器转换为IObservable

本文关键字:IObservable 转换 监听器 | 更新日期: 2023-09-27 18:14:37

我的service方法接受一个侦听器,并使用数据流调用它。我正在尝试将这个流转换为IObservable<T>。到目前为止,我所做的是:

public class MessageListener : IMessageListener
{
    private readonly Subject<string> stream = new Subject<string>();
    public IObservable<string> MessageStream
    {
        get
        {
            return this.stream;
        }
    }
    public void OnMessageAdded(string message)
    {
        this.stream.OnNext(message);
    }
}
//Calling code    
public IObservable<string> GetMessage()
{
     var listener = new MessageListener();
     service.Subscribe(listener);
     listener.MessageStream.SubscribeOn(Scheduler.NewThread);
}

我不确定这是否足够好。我相信对SubscribeOn的调用将只在新线程上运行订阅代码。我如何确保OnMessageAdded被一个新线程接收?

将流的监听器转换为IObservable

正如您已经发现的那样,SubscribeOn将只控制订阅发生的位置。但是,在您的代码中,SubscribeOn没有任何作用,因为它后面没有订阅。记住,SubscribeOn返回一个用于订阅的Observable。你不能调用"SubscribeOn"来在源代码中设置一些全局标志或类似的东西。

你真正想做的是在订阅发生之前在你的服务中调用"ObserveOn"。ObserveOn定义了处理传入消息的线程。另一种选择是将ObserveOn直接写入MessageListener,使其看起来像这样:

public IObservable<string> MessageStream
{
    get
    {
        return this.stream.ObserveOn(Scheduler.ThreadPool).AsObservable();
    }
}

另一件要注意的事情是,在MessageStream属性中最好调用this.stream.AsObservable()而不是直接返回主题。