使用Rx对我不想在特定时间执行的操作进行排队
本文关键字:操作 排队 执行 定时间 Rx 我不想 使用 | 更新日期: 2023-09-27 18:06:35
总结:我有一个在业务对象上执行工作流的web应用程序,有时需要在步骤之间故意等待几秒钟或几分钟。我正在寻找(也许通过Rx.NET),改善这些工作流程的执行,这样我就不会耗尽ThreadPool,使网站在系统负载过重时无响应。
一个非常简化的工作流版本是:
- 创建对象
- 从系统A中加载数据
- POST此数据到系统B
如果系统A关闭,我的应用程序等待并稍后重试。等待时间是根据GMail在重试中不断升级的延迟来建模的:等待1秒,每次后续重试都要等待一倍(最多1小时)。这个应用程序痴迷地将状态保存到数据库中,所以如果整个应用程序崩溃了,当它重新启动时,它将恢复它离开的所有工作流。
当前(请温和)工作流中的每个步骤都是通过调用ThreadPool来执行的。QueueUserWorkItem将调用Thread的方法排队。如果需要为上面描述的重试延迟休眠,然后实际执行该步骤。
如果系统运行良好(没有错误),它可以很容易地处理我们扔给它的所有流量,ThreadPool很好地管理所有这些工作流实例的并行执行。但是如果系统B关闭了一段时间,重试计数和延迟就会增加,很快线程池就会被所有的休眠线程填满,导致网站对新请求没有响应。
本质上,我想把所有这些挂起的工作流扔到一个按(最后执行时间+期望的重试延迟)排序的队列中。尽管我读了很多关于Rx的文章,并对它感到很兴奋,但我从来没有机会使用它,但它似乎是处理这个问题的一种有用的方法。如果Rx可以神奇地在这些物体准备发射时把它们吐出来,那么它似乎会
- 大大简化和澄清了这个逻辑,并且
- 防止大量线程的浪费使用,这些线程99%的时间都处于睡眠状态
对Rx新手的任何指导都将是非常感激的,即使它只是解释为什么这实际上不是Rx的一个好用例。
在这种情况下,我可能会坚持当前的解决方案,因为这一点:
应用程序痴迷地将状态保存到数据库中,所以如果整个应用程序崩溃,当它重新启动时,它将恢复它离开的所有工作流。
在启动时通过反序列化"恢复"管道(即x.Where().Select().Timeout().Bla()
)是…棘手的。
如果没有更多的信息,很难给你一个更详细的解决方案,如果你不尝试对整个流程建模,它实际上可能与Rx一起工作得很好,只是交易位(即从a加载,发送到B)。
无论如何,解决线程池耗尽的方法是通过System.Threading.Timer类,它告诉线程池在将新项目排队之前等待超时。你一定要适应:
public IDisposable StartProcess<T>(Action<T> load, Action<T> post) where T : new()
{
return StartProcess(TimeSpan.FromSeconds(1), new T())
.Do(load)
.Subscribe(post);
}
private IObservable<long> StartProcess<T>(TimeSpan span, T obj) where T : new()
{
Observable
.Interval(span)
.OnErrorResumeNext(Observable.Defer(() => StartProcess(IncreaseSpan(span), obj)))
.Concat(Observable.Defer(() => StartProcess(TimeSpan.FromSeconds(1), new T())));
}
private TimeSpan IncreaseSpan(TimeSpan span)
{
return TimeSpan.FromSeconds(span.TotalSeconds < 1800? span.TotalSeconds * 2 : 3600);
}
现在我宁愿加载实例化和填充对象,而不是显式地这样做,因为函数式编程不鼓励可变性,你可能希望load
实际上去数据库并恢复状态,就像你提到的。
我不确定你是否想保留状态对象,以防对post
或load
的调用崩溃,你需要适应,因为目前,它将保留load
或post
崩溃的状态,如果post
崩溃,它将再次调用load
而没有一个新的状态,这可能绝对不是你想要做的。
我没有测试代码,但是Rx适合你想做的事情。
查看Rx论坛上的这篇文章。对于想要解决的问题来说,这是一个非常方便的操作符:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/af43b14e-fb00-42d4-8fb1-5c45862f7796/
Rx是处理这类问题(特别是)的好方法,因为你可以有你的异步函数/可观察对象,并对它们应用通用操作符,如所描述的重试操作符。