如何在c#服务中实现多个发布调度
本文关键字:布调度 调度 服务 实现 | 更新日期: 2023-09-27 18:09:29
我有一个服务,它正在监听来自上游系统的位置更新。现在这个位置更新有多个消费者。
- 一个消费者想要尽快得到更新
- 一个消费者想要每30秒更新一次
- 当累积50个更新时,一个消费者想要获得更新
- 一个消费者希望在累积50次更新或30秒时获得更新。
以上内容可以随时更改或添加新的变体?
如何使其可配置,可扩展,以及我应该使用哪种编程方法。
我正在用c#开发Window Service
听起来你在描述这样一个场景:服务是发布源(服务本身是订阅者)之间的中介,服务将此信息重新广播给N订阅者,但是根据他们的时间表。
因此,假设更新是单个位置更新,而不是某种聚合,如滚动平均或缓冲(例如,只是每30秒一次的汽车的最新位置,而不是过去30秒以来的所有位置),那么您需要为每个订阅者维护一些信息:
- 订阅
- 。谁是消费者?我如何通知它?(如回调、回复队列等) 规范
- 。消费者需要什么,什么时候需要?(例如每50个滴答)
- 自上次发送以来的更新次数
- …
当服务接收更新时,对于每个消费者,它必须根据来自源的每次更新的状态评估规范;比如:
if (consumer.Spec.Matches(consumer.State, updateMessage)
SendUpdate(consumer.Subscription.Callback, updateMessage)
以上假设你的规范可以被服务直接执行(即消费者在进程中,或者规范被序列化并且可以被服务反序列化)。如果这个不是的情况,你的规范可能代表一个DSL(例如,一个可解析的表示,服务器可以编译成它可以执行)。另一种方法是将规范视为指令集。例如,
public enum FrequencyUnit
{
SecondsSinceLastSend,
UpdatesSinceLastSend,
}
public class Frequency
{
public double Value { get; set; }
public FrequencyUnit Unit { get; set; }
}
public class Operator
{
Every, // Unary: e.g. every update; every 10 sec; every 5 updates
Or, // Nary: e.g. every 50 or every 20 sec (whichever's first)
And, // Nary: e.g. 19 messages and 20 sec have passed
// etc.
}
public class UpdateSpec
{
public Frequency[] Frequencies { get; set; }
public Operator Operator { get; set; }
}
这些非常灵活,可以在服务器代码中配置,或者通过读取XML或其他东西来构建。这些也可以在注册时从消费者本身传递给服务。例如,IService.Register()
可以公开接收订阅和规范的接口。
最后一点是可伸缩性。我描述了针对消费者的每次更新的服务循环。这不能很好地扩展,因为循环可能会阻止从源接收更新,或者如果与源异步,至少可能会比处理更新更快地积累更新。
处理这个问题的策略是在为每个订阅者维护的信息中添加一个内部队列。服务在接收到更新后,将其加入到每个内部队列中。然后,服务任务(基于tpl)、线程池线程或长寿命线程将像上面那样脱离队列并评估更新。这里有很多可能的变化和优化