RX设计-对外部系统进行节流

本文关键字:系统 设计 对外部 RX | 更新日期: 2023-09-27 18:28:06

我正在编写一个消息代理程序,它可以利用IBM MQ和文件系统上的文件夹。在接收到消息后,它将它们具体化为强类型类,并将它们插入RX主题。

我已经建立了对消息的感知,使我能够识别哪些外部系统需要被击中来处理它们,这样我就可以对RX可观察性进行查询,并选择不针对外部系统的消息,等等。

我下一步要做的是通过外部系统来抑制消息,例如:

如果我用某种类型的消息访问CRM系统,并且我决定我想用最多4个并发呼叫访问该系统,我一次只能处理4条消息,如果我有第5条消息,我将不得不等待前4条消息中的一条完成,然后转到第5条。其他类型的资源也是如此,如外部数据库、其他外部web服务等。

我已经开始研究这个问题,到目前为止,最好的设计方法是编写自己的调度器。不利的一面是,我必须编写自己的内部结构,在消息被提取后,在调度器内对消息进行排队,而这正是我不喜欢这种方法的地方。

有人有更好的方法吗?

RX设计-对外部系统进行节流

您所描述的似乎是最大并发性。Merge操作符支持这样的东西。

您需要使用类似GroupBy的东西来根据流的去向来分割流,然后在每个分割块上使用具有最大并发性的Merge,最后使用Merge将结果重新组合在一起。类似这样的东西:

IObservable<T> requests = ...;
requests.GroupBy(request => PickExternalSystem(request))
    .Select(group => group // group.Key is the TExternalSystem
        .Select(request => Observable.Defer(() => group.Key.ExecuteAsync(request)))
        .Merge(maxConcurrency: group.Key.MaxConcurrency))
    .Merge() // merge the results of each group back together again
    .Subscribe(result => ...);

您可能需要研究ReactiveUI,它包括用于服务请求的速率调节机制。看见http://blog.paulbetts.org/index.php/2011/01/15/reactivexaml-is-now-reactiveui-2-0/

我也在MSDN上发布了同样的问题,并通过Merge运算符的不同实现得到了更深入的答案,这样当最大并发值发生变化时就不会发生数据丢失。