在RX中向多个订阅发布

本文关键字:RX | 更新日期: 2023-09-27 17:50:49

我正在研究如何为一个项目开发一个插件框架,Rx似乎很适合我想要实现的目标。最终,该项目将是一组插件(模块化功能),可以通过xml配置来做不同的事情。需求如下

  1. 即使在插件中也要强制模块化架构。这鼓励了松耦合,并潜在地最小化了复杂性。这有望使单个插件功能更容易建模和测试
  2. 加强数据的不变性,以降低复杂性,并确保模块内的状态管理保持在最低限度
  3. 通过提供线程池线程来尽可能地在模块中完成工作来阻止手动创建线程

在我看来,插件本质上是一个数据转换实体。这意味着一个插件要么

  • 接收一些数据并以某种方式将其转换为新数据(此处未显示)
  • 自身生成数据并将其推送给观察者
  • 接收一些数据并在数据上做一些工作,而不通知外部

如果你进一步理解这个概念,一个插件可以包含上述所有三种类型。例如,在一个插件中,你可以有一个IntGenerator模块来生成一些数据给ConsoleWorkUnit模块等等。因此,我试图在主函数中建模的是插件必须完成其工作的布线。

为此,我使用来自Microsoft的Immutable内核创建了以下基类。我想要实现的是抽象掉Rx调用,这样它们就可以在模块中使用,所以最终的目标是在抽象类中封装调用缓冲区等,可以用来组成复杂的查询和模块。这样,代码就比实际读取模块内的所有代码来查找它订阅了一个类型为x的缓冲区或窗口等更具有自我文档性。

public abstract class OutputBase<TOutput> : SendOutputBase<TOutput>
{
    public abstract void Work();
}
public interface IBufferedBase<TOutput>
{
    void Work(IList<ImmutableList<Data<TOutput>>> list);
}
public abstract class BufferedWorkBase<TInput> : IBufferedBase<TInput>
{
    public abstract void Work(IList<ImmutableList<Data<TInput>>> input);
}
public abstract class SendOutputBase<TOutput>
{
    private readonly ReplaySubject<ImmutableList<Data<TOutput>>> _outputNotifier;
    private readonly IObservable<ImmutableList<Data<TOutput>>> _observable;
    protected SendOutputBase()
    {
        _outputNotifier = new ReplaySubject<ImmutableList<Data<TOutput>>>(10);
        _observable  =  _outputNotifier.SubscribeOn(ThreadPoolScheduler.Instance);
        _observable = _outputNotifier.ObserveOn(ThreadPoolScheduler.Instance);
    }
    protected void SetOutputTo(ImmutableList<Data<TOutput>> output)
    {
        _outputNotifier.OnNext(output);
    }
    public void ConnectOutputTo(IWorkBase<TOutput> unit)
    {
        _observable.Subscribe(unit.Work);
    }
    public void BufferOutputTo(int count, IBufferedBase<TOutput> unit)
    {
        _observable.Buffer(count).Subscribe(unit.Work);
    }
}
public abstract class WorkBase<TInput> : IWorkBase<TInput>
{
    public abstract void Work(ImmutableList<Data<TInput>> input);
}
public interface IWorkBase<TInput>
{
    void Work(ImmutableList<Data<TInput>> input);
}
public class Data<T>
{
    private readonly T _value;
    private Data(T value)
    {
        _value = value;
    }
    public static Data<TData> Create<TData>(TData value)
    {
        return new Data<TData>(value);
    }
    public T Value { get { return _value; } }
}

这些基类用于创建三个类;一个用于生成一些int型数据,一个用于在数据出现时打印出来,最后一个用于在数据出现时对其进行缓冲,并将值以3为单位求和。

public class IntGenerator : OutputBase<int>
{
    public override void Work()
    {
        var list = ImmutableList<Data<int>>.Empty;
        var builder = list.ToBuilder();
        for (var i = 0; i < 1000; i++)
        {
            builder.Add(Data<int>.Create(i));
        }
        SetOutputTo(builder.ToImmutable());
    }
}
public class ConsoleWorkUnit : WorkBase<int>
{
    public override void Work(ImmutableList<Data<int>> input)
    {
        foreach (var data in input)
        {
            Console.WriteLine("ConsoleWorkUnit printing {0}", data.Value);
        }
    }
}
public class SumPrinter : WorkBase<int>
{
    public override void Work(ImmutableList<Data<int>> input)
    {
        input.ToObservable().Buffer(2).Subscribe(PrintSum);
    }
    private void PrintSum(IList<Data<int>> obj)
    {
      Console.WriteLine("Sum of {0}, {1} is {2} ", obj.First().Value,obj.Last().Value ,obj.Sum(x=>x.Value) );
    }
}

这些是在像这样的主程序中运行的

        var intgen = new IntGenerator();
        var cons = new ConsoleWorkUnit();
        var sumPrinter = new SumPrinter();
        intgen.ConnectOutputTo(cons);
        intgen.BufferOutputTo(3,sumPrinter);
        Task.Factory.StartNew(intgen.Work);
        Console.ReadLine();

这个架构合理吗?

在RX中向多个订阅发布

你正在缓冲你的观察对象(.Buffer(count)),以便它只在count通知到达后发出信号。

然而,你的IntGenerator.DoWork只产生一个值。因此,您永远不会"填满"缓冲区并触发下游通知。

要么改变DoWork,使其最终产生更多的值,要么让它在完成其工作时完成可观察流。当流完成时,Buffer将释放剩余的缓冲值。要做到这一点,这意味着IntGenerator.DoWork需要在某个地方引起对_outputNotifier.OnCompleted()的调用

相关文章:
  • 没有找到相关文章