用Rx将多个事件源组合成一个IObservable

本文关键字:IObservable 一个 组合 Rx 事件源 | 更新日期: 2023-09-27 18:15:06

这是一个关于如何在特定事件相关场景中使用响应式扩展(Reactive Extensions, Rx)的问题。

  • 目的是采取一些类来触发一些事件
  • 并将它们聚集到一个IObservable中,可以由任何客户端订阅(不知道事件类)。
  • 注意感兴趣的事件使用子类EventArgs

一些自定义EventArgs

public class HappenedEventArgs : EventArgs
{
    internal bool IsBadNotGood;
}

发生事件的许多独立类

public class EventSourceA : IEventSource {
    public event HappenedEventHandler Happened;
    private void OnHappened(HappenedEventArgs e)
    {
        if (Happened != null)
            Happened(this, e);
    }
    // And then this class calls OnHappened(e) whenever it decides to ...
}
public class EventSourceB : IEventSource {
    public event HappenedEventHandler Happened;
    private void OnHappened(HappenedEventArgs e)
    {
        if (Happened != null)
            Happened(this, e);
    }
    // And then this class also calls OnHappened(e) at times ...
}
public interface IEventSource
{
    event HappenedEventHandler Happened;
}
public delegate void HappenedEventHandler(object sender, HappenedEventArgs e);

如何聚集所有这些事件并暴露一个统一的事件前线

public class Pooler{
    private IObservable<X> _pool;
    public IObservable<X> Subscribe(){
        return _pool;        
    }
    public void Register(IEventSource item)
    {
        // How to take item.Happened and inject/bind it into _pool here?
    }        
    internal void Unregister(IEventSource item)
    {
        // Disconnect item.Happened from _pool
    }
    public Pooler(){
        // Instantiate _pool to whatever is best?
        // _pool = ...
    }
 }

不直接了解事件源的订阅者

 static void Try() {
     var pooler = new Pooler();
     pooler.Subscribe().Subscribe(e =>
            {
                 // Do something with events here, as they arrive
            }
     );
     // ....
     // Wherever whenever:
     AddEventSources(pooler);
 }
 static void AddEventSources(Pooler pooler){
     var eventSourceA = new EventSourceA();
     pooler.Register(eventSourceA);
     var eventSourceB = new EventSourceB();
     pooler.Register(eventSourceB);     
 }

用Rx将多个事件源组合成一个IObservable

Rx库试图提供的是处理此类情况的方法,而不必创建一堆手动传播可观察对象的类/方法。

假设你有一个带有事件的类:

public class EventedClass
{
    public event Action<EventArgs> Event;
}

和一个可枚举的实例IEnumerable<EventedClass> objects,您可以使用LINQ从这些类中投影可观察对象,将它们与Observable.Merge结合,这将为您提供这些事件的组合顺序输出。

Observable.Merge(
    objects.Select(
        o => Observable.FromEvent<EventArgs>(
            handler => o.Event += handler,
            handler => o.Event -= handler
        )
)).Subscribe(args => 
{ 
    //do stuff
});

听起来你在做类似于这个问题的事情。基本上,您希望使用主题作为_pool变量,并让它订阅和取消订阅Register和Unregister中的不同事件源。要注销一个源,您需要保留在Register调用中获得的一次性材料。此外,我会考虑让Pooler直接实现IObservable,并将Subscribe转发给_pool变量。

using System.Reactive.Subjects;
using System.Reactive.Linq;
public class Pooler 
    : IObservable<HappenedEventArgs>, 
      IDisposable
{
    void Dispose()
    {
        if (_pool != null) _pool.Dispose();
        if (_sourceSubs != null)
        {
            foreach (var d in _sourceSubs.Values)
            {
                d.Dispose();
            }
            _sourceSubs.Clear();
        }
    }
    private Subject<HappenedEventArgs> _pool = new Subject<HappenedEventArgs>();
    private Dictionary<IEventSource, IDisposable> _sourceSubs = new Dictionary<IEventSource, IDisposable>();
    public IDisposable Subscribe(IObserver<HappenedEventArgs> observer)
    {
        return _pool.Subscribe(observer);
    }
    public void Register(IEventSource item)
    {
        if (_sourceSubs.ContainsKey(item))
        {
            return; //already registered
        }
        else
        {
            _sourceSubs.Add(item,
                            Observable.FromEventPattern((EventHandler<HappenedEventArgs> h) => item.Happened += h,
                                                        h => item.Happened -= h)
                                      .Select(ep => ep.EventArgs)
                                      .Subscribe(_pool));
        }
    }
    internal void Unregister(IEventSource item)
    {
        IDisposable disp = null;
        if (_sourceSubs.TryGetValue(item, out disp))
        {
            _sourceSubs.Remove(item);
            disp.Dispose();
        }
    }
}

请注意,您需要实现IDisposable,以确保在完成Pooler后可以清除所有事件订阅。