可观察到的.带有任务延迟的定时器或TPL

本文关键字:定时器 延迟 TPL 任务 观察 | 更新日期: 2023-09-27 18:10:26

我有一个需求,在初始延迟10秒之后,我需要每10分钟执行一次SomeMethod,但是有一个问题是,在SomeMethod完成后,10分钟计时器应该启动。下面是一个简单的例子:

Start Task 00:00:00
(10 second delay)
SomeMethod executed at 00:00:10 (takes 15 minutes)
(10 minute delay)
SomeMethod executed at 00:25:10 
... and so on.

我知道如何使用TPL做到这一点。我可以使用task启动任务。延迟并执行SomeMethod,然后在每次完成(ContinueWith TaskStatus.RanToCompletion)后,我创建一个新任务并再次执行SomeMethod

我的问题是,这是可能的使用Observable.Timer ?类似…

Observable.Timer(TimeSpan.FromSeconds(10), TimeSpan.FromMinutes(10))

这段代码的问题是,如果SomeMethod需要15分钟,我将有两个不同的SomeMethod实例运行,这是我不想要的。我希望在 SomeMethod完成后启动10分钟计时器。这是可能的使用Observable或我应该留在TPL?

EDIT:忘了说我想要SomeMethod在它自己的线程中运行。

可观察到的.带有任务延迟的定时器或TPL

您应该对使用Observable.Timer做更多的调查。它的工作原理几乎就像你想直接打开盒子一样。

关于Rx需要理解的一件重要的事情是,它保证永远不会并发执行单个订阅。虽然Rx可以启用各种多线程场景,但它总是序列化订阅。

那么,以这个可观察订阅为例:

Observable
    .Timer(TimeSpan.FromSeconds(10.0), TimeSpan.FromSeconds(2.0))
    .Timestamp()
    .Subscribe(x =>
    {
        Thread.Sleep(5000);
        Console.WriteLine(x.ToString());
    });

我已经创建了一个可观察对象,它将等待10秒开始发射值,然后将尝试每2秒发射一个值。

然后我添加了.Timestamp()来准确记录何时产生值。

最后,我订阅了一个强制5秒线程睡眠的观察者。

以下是前4个值的输出:

0@2015-08-31 10:44:34 +00:00
1@2015-08-31 10:44:39 +00:00
2@2015-08-31 10:44:44 +00:00
3@2015-08-31 10:44:49 +00:00

您会注意到两个值之间的间隔是5秒。这已经很接近你想要的了。Rx看到两秒钟已经过去了,并立即执行下一个值。

但是还有另一个Rx操作符可以完全满足你的需求——.Generate(...)。这是一个非常强大的操作符,可以生成所有类型的可观察流。

你可以这样使用:

Observable
    .Generate(0, x => true, x => x + 1, x => x,
        x => x == 0 ? TimeSpan.FromSeconds(10.0) : TimeSpan.FromSeconds(2.0))
    .Timestamp()
    .Subscribe(x =>
    {
        Thread.Sleep(5000);
        Console.WriteLine(x.ToString());
    });

在这种情况下,它完全按照您想要的方式工作。下面是前十个值:

0@2015-08-31 10:48:27 +00:00
1@2015-08-31 10:48:34 +00:00
2@2015-08-31 10:48:41 +00:00
3@2015-08-31 10:48:48 +00:00
4@2015-08-31 10:48:55 +00:00
5@2015-08-31 10:49:02 +00:00
6@2015-08-31 10:49:09 +00:00
7@2015-08-31 10:49:16 +00:00
8@2015-08-31 10:49:23 +00:00
9@2015-08-31 10:49:30 +00:00

每7秒发射一次。2来自生成操作符,5来自观察者。

你显然可以把你需要的时间写进去

假设SomeMethod在完成时发出OnCompleted事件,我们可以将其编写为Observable

//If SomeMethod OnCompleted conforms to .NET Event Pattern
var completedObservable = Observable.FromEventPattern<OnCompletedEventArgs>(
            e => this.OnCompleted += e,
            e => this.OnCompleted += e);
//Subscribe to OnCompleted events
var repeatDisposable = completedObservable.Subscribe(_ => 
                                Observable.Timer(TimeSpan.FromMinutes(10))
                                          .Subscribe(_ => SomeMethod()));
//Start condition
var starterDisposable = Observable.Timer(TimeSpan.FromSeconds(10))
                                  .Subscribe(_ => SomeMethod());