在特定时间使用 C# 反应式扩展调用函数

本文关键字:反应式 扩展 调用 函数 定时间 | 更新日期: 2023-09-27 18:34:07

是否可以在特定时间使用响应式扩展调用函数?

例如,如果我想每天上午 9 点和下午 1 点准确地调用方法 foo((,我可以使用 Timer 类每隔几秒钟检查一下是上午 9 点还是下午 1 点,甚至是 Observable.Interval 函数。但是有没有更有效的方法呢?所以我不会每隔几秒钟检查一下是否该调用 foo((,而是一个将在适当时间自行调用 foo(( 的可观察量。

在特定时间使用 C# 反应式扩展调用函数

只需使用接受DateTimeOffset值的计时器重载即可。您可以使用DeferRepeat来创建"绝对间隔"。

Observable.Defer(() => 
    DateTime.Now.Hour < 9
  ? Observable.Timer(DateTime.Today.AddHours(9))
  : DateTime.Now.Hour < 13
  ? Observable.Timer(DateTime.Today.AddHours(13))
  : Observable.Timer(DateTime.Today.AddDays(1).AddHours(9)))
          .Repeat()
          .Subscribe(...);

Rx 会尽其所能自动确保您的通知将在指定的确切日期和时间发生,即使对于计时器偏移以及在计时器持续时间之前系统时钟更改也是如此。

下面是一个扩展方法,可以进一步概括问题。

用法:

Observable2.Daily(TimeSpan.FromHours(9), TimeSpan.FromHours(13)).Subscribe(...);

定义:

public static partial class Observable2
{
  public static IObservable<long> Daily(params TimeSpan[] times)
  {
    Contract.Requires(times != null);
    Contract.Requires(Contract.ForAll(times, time => time > TimeSpan.Zero));
    Contract.Requires(Contract.ForAll(times, time => time.TotalDays < 1));
    return Daily(Scheduler.Default, times);
  }
  public static IObservable<long> Daily(IScheduler scheduler, params TimeSpan[] times)
  {
    Contract.Requires(times != null);
    Contract.Requires(Contract.ForAll(times, time => time > TimeSpan.Zero));
    Contract.Requires(Contract.ForAll(times, time => time.TotalDays < 1));
    if (times.Length == 0)
      return Observable.Never<long>();
    // Do not sort in place.
    var sortedTimes = times.ToList();
    sortedTimes.Sort();
    return Observable.Defer(() =>
      {
        var now = DateTime.Now;
        var next = sortedTimes.FirstOrDefault(time => now.TimeOfDay < time);
        var date = next > TimeSpan.Zero
                 ? now.Date.Add(next)
                 : now.Date.AddDays(1).Add(sortedTimes[0]);
        Debug.WriteLine("Next @" + date + " from " + sortedTimes.Aggregate("", (s, t) => s + t + ", "));
        return Observable.Timer(date, scheduler);
      })
      .Repeat()
      .Scan(-1L, (n, _) => n + 1);
  }
}

更新:

如果你想在你的方法中更加"实用",根据Jeroen Mostert的答案,将输入定义为迭代器块的无限序列,那么你可以使用Generate如下。

Observable.Generate(
  GetScheduleTimes().GetEnumerator(), 
  e => e.MoveNext(), 
  e => e, 
  e => e.Current, 
  e => e.Current);

Jeroen Mostert的回答(已删除(提供了GetScheduleTimes的示例实现,但基本上它只是一个迭代器块,在while循环中产生无限序列的DateTimeOffset值,每个循环将值的天增加1。