可观察到的.带有任务延迟的定时器或TPL
本文关键字:定时器 延迟 TPL 任务 观察 | 更新日期: 2023-09-27 18:10:26
我有一个需求,在初始延迟10秒之后,我需要每10分钟执行一次SomeMethod
,但是有一个问题是,在SomeMethod
完成后,10分钟计时器应该启动。下面是一个简单的例子:
Start Task 00:00:00
(10 second delay)
SomeMethod executed at 00:00:10 (takes 15 minutes)
(10 minute delay)
SomeMethod executed at 00:25:10
... and so on.
我知道如何使用TPL做到这一点。我可以使用task启动任务。延迟并执行SomeMethod
,然后在每次完成(ContinueWith TaskStatus.RanToCompletion
)后,我创建一个新任务并再次执行SomeMethod
。
我的问题是,这是可能的使用Observable.Timer
?类似…
Observable.Timer(TimeSpan.FromSeconds(10), TimeSpan.FromMinutes(10))
这段代码的问题是,如果SomeMethod
需要15分钟,我将有两个不同的SomeMethod
实例运行,这是我不想要的。我希望在 SomeMethod
完成后启动10分钟计时器。这是可能的使用Observable
或我应该留在TPL?
EDIT:忘了说我想要SomeMethod
在它自己的线程中运行。
您应该对使用Observable.Timer
做更多的调查。它的工作原理几乎就像你想直接打开盒子一样。
关于Rx需要理解的一件重要的事情是,它保证永远不会并发执行单个订阅。虽然Rx可以启用各种多线程场景,但它总是序列化订阅。
那么,以这个可观察订阅为例:
Observable
.Timer(TimeSpan.FromSeconds(10.0), TimeSpan.FromSeconds(2.0))
.Timestamp()
.Subscribe(x =>
{
Thread.Sleep(5000);
Console.WriteLine(x.ToString());
});
我已经创建了一个可观察对象,它将等待10秒开始发射值,然后将尝试每2秒发射一个值。
然后我添加了.Timestamp()
来准确记录何时产生值。
最后,我订阅了一个强制5秒线程睡眠的观察者。
以下是前4个值的输出:
0@2015-08-31 10:44:34 +00:00
1@2015-08-31 10:44:39 +00:00
2@2015-08-31 10:44:44 +00:00
3@2015-08-31 10:44:49 +00:00
您会注意到两个值之间的间隔是5秒。这已经很接近你想要的了。Rx看到两秒钟已经过去了,并立即执行下一个值。
但是还有另一个Rx操作符可以完全满足你的需求——.Generate(...)
。这是一个非常强大的操作符,可以生成所有类型的可观察流。
你可以这样使用:
Observable
.Generate(0, x => true, x => x + 1, x => x,
x => x == 0 ? TimeSpan.FromSeconds(10.0) : TimeSpan.FromSeconds(2.0))
.Timestamp()
.Subscribe(x =>
{
Thread.Sleep(5000);
Console.WriteLine(x.ToString());
});
在这种情况下,它完全按照您想要的方式工作。下面是前十个值:
0@2015-08-31 10:48:27 +00:00
1@2015-08-31 10:48:34 +00:00
2@2015-08-31 10:48:41 +00:00
3@2015-08-31 10:48:48 +00:00
4@2015-08-31 10:48:55 +00:00
5@2015-08-31 10:49:02 +00:00
6@2015-08-31 10:49:09 +00:00
7@2015-08-31 10:49:16 +00:00
8@2015-08-31 10:49:23 +00:00
9@2015-08-31 10:49:30 +00:00
每7秒发射一次。2来自生成操作符,5来自观察者。
你显然可以把你需要的时间写进去
假设SomeMethod
在完成时发出OnCompleted
事件,我们可以将其编写为Observable
//If SomeMethod OnCompleted conforms to .NET Event Pattern
var completedObservable = Observable.FromEventPattern<OnCompletedEventArgs>(
e => this.OnCompleted += e,
e => this.OnCompleted += e);
//Subscribe to OnCompleted events
var repeatDisposable = completedObservable.Subscribe(_ =>
Observable.Timer(TimeSpan.FromMinutes(10))
.Subscribe(_ => SomeMethod()));
//Start condition
var starterDisposable = Observable.Timer(TimeSpan.FromSeconds(10))
.Subscribe(_ => SomeMethod());