用Rx将多个事件源组合成一个IObservable
本文关键字:IObservable 一个 组合 Rx 事件源 | 更新日期: 2023-09-27 18:15:06
这是一个关于如何在特定事件相关场景中使用响应式扩展(Reactive Extensions, Rx)的问题。
- 目的是采取一些类来触发一些事件
- 并将它们聚集到一个
IObservable
中,可以由任何客户端订阅(不知道事件类)。 - 注意感兴趣的事件使用子类
EventArgs
一些自定义EventArgs
public class HappenedEventArgs : EventArgs
{
internal bool IsBadNotGood;
}
发生事件的许多独立类
public class EventSourceA : IEventSource {
public event HappenedEventHandler Happened;
private void OnHappened(HappenedEventArgs e)
{
if (Happened != null)
Happened(this, e);
}
// And then this class calls OnHappened(e) whenever it decides to ...
}
public class EventSourceB : IEventSource {
public event HappenedEventHandler Happened;
private void OnHappened(HappenedEventArgs e)
{
if (Happened != null)
Happened(this, e);
}
// And then this class also calls OnHappened(e) at times ...
}
public interface IEventSource
{
event HappenedEventHandler Happened;
}
public delegate void HappenedEventHandler(object sender, HappenedEventArgs e);
如何聚集所有这些事件并暴露一个统一的事件前线
public class Pooler{
private IObservable<X> _pool;
public IObservable<X> Subscribe(){
return _pool;
}
public void Register(IEventSource item)
{
// How to take item.Happened and inject/bind it into _pool here?
}
internal void Unregister(IEventSource item)
{
// Disconnect item.Happened from _pool
}
public Pooler(){
// Instantiate _pool to whatever is best?
// _pool = ...
}
}
不直接了解事件源的订阅者
static void Try() {
var pooler = new Pooler();
pooler.Subscribe().Subscribe(e =>
{
// Do something with events here, as they arrive
}
);
// ....
// Wherever whenever:
AddEventSources(pooler);
}
static void AddEventSources(Pooler pooler){
var eventSourceA = new EventSourceA();
pooler.Register(eventSourceA);
var eventSourceB = new EventSourceB();
pooler.Register(eventSourceB);
}
Rx库试图提供的是处理此类情况的方法,而不必创建一堆手动传播可观察对象的类/方法。
假设你有一个带有事件的类:
public class EventedClass
{
public event Action<EventArgs> Event;
}
和一个可枚举的实例IEnumerable<EventedClass> objects
,您可以使用LINQ从这些类中投影可观察对象,将它们与Observable.Merge
结合,这将为您提供这些事件的组合顺序输出。
Observable.Merge(
objects.Select(
o => Observable.FromEvent<EventArgs>(
handler => o.Event += handler,
handler => o.Event -= handler
)
)).Subscribe(args =>
{
//do stuff
});
听起来你在做类似于这个问题的事情。基本上,您希望使用主题作为_pool
变量,并让它订阅和取消订阅Register和Unregister中的不同事件源。要注销一个源,您需要保留在Register调用中获得的一次性材料。此外,我会考虑让Pooler
直接实现IObservable
,并将Subscribe
转发给_pool
变量。
using System.Reactive.Subjects;
using System.Reactive.Linq;
public class Pooler
: IObservable<HappenedEventArgs>,
IDisposable
{
void Dispose()
{
if (_pool != null) _pool.Dispose();
if (_sourceSubs != null)
{
foreach (var d in _sourceSubs.Values)
{
d.Dispose();
}
_sourceSubs.Clear();
}
}
private Subject<HappenedEventArgs> _pool = new Subject<HappenedEventArgs>();
private Dictionary<IEventSource, IDisposable> _sourceSubs = new Dictionary<IEventSource, IDisposable>();
public IDisposable Subscribe(IObserver<HappenedEventArgs> observer)
{
return _pool.Subscribe(observer);
}
public void Register(IEventSource item)
{
if (_sourceSubs.ContainsKey(item))
{
return; //already registered
}
else
{
_sourceSubs.Add(item,
Observable.FromEventPattern((EventHandler<HappenedEventArgs> h) => item.Happened += h,
h => item.Happened -= h)
.Select(ep => ep.EventArgs)
.Subscribe(_pool));
}
}
internal void Unregister(IEventSource item)
{
IDisposable disp = null;
if (_sourceSubs.TryGetValue(item, out disp))
{
_sourceSubs.Remove(item);
disp.Dispose();
}
}
}
请注意,您需要实现IDisposable
,以确保在完成Pooler
后可以清除所有事件订阅。