使用 RX 在不同时间触发事件

本文关键字:事件 同时间 RX 使用 | 更新日期: 2023-09-27 18:33:11

我有大量的简单对类集合:

public class Pair { public DateTime Timestamp; public double Value; }

它们按时间戳升序排序。我想在适当的时间为列表中的每个项目触发一个具有值(例如,Action)的事件。时间是过去的,所以我需要规范化时间戳,以便列表中的第一个是"现在"。我们是否可以使用反应式扩展进行设置,以便在两个项目之间的时间差之后触发下一个事件?

使用 RX 在不同时间触发事件

假设pairs是你的序列:

var obs = pairs.OrderBy(p => p.Timestamp).ToObservable();

现在obs成对作为有序可观察量。

Observable.Zip(
    obs,
    obs.Take(1).Concat(obs),
    (pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp)
      .Select(_ => pair1.Value))
.Concat()
.Subscribe(/* Do something here */);

拉链负责将绝对时间转换为偏移量。它将获取序列并将其与自身连接,但偏移量为 1,如下所示

Original 1--2--4--7--11
Offset   1--1--2--4--7--11
Joined   0--1--2--3--4

然后将此新值放入Observable.Timer以将其延迟适当的量。最后的Concat将结果从IObservable<IObservable<double>>展平为IObservable<double>。这假定您的序列已排序。

如果通过"使用 Rx",您允许我只使用 Rx 调度程序,那么这是一个非常简单的解决方案:

Action<double> action =
    x =>
        Console.WriteLine(x);
var ts0 = pairs.Select(p => p.Timestamp).Min();
pairs
    .ForEach(p => 
        Scheduler
            .ThreadPool
            .Schedule(
                p.Timestamp.Subtract(ts0),
                () => action(p.Value)));

这使用 System.Interactive 扩展ForEach ,但您可以使用常规foreach循环来加载调度程序。

我已经使用以下虚拟数据测试了代码:

var pairs = new []
{
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, },
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, },
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, },
};

我希望这有所帮助。

我认为这个问题很有趣,这将是我第一次尝试。

static void RunPairs(IEnumerable<Pair> pairs, Action<double> pairEvent)
{
  if (pairs == null || !pairs.Any() || pairEvent == null)
    return;
  // if we can promise the pairs are already sorted
  // obviously we don't need this next line
  pairs = pairs.OrderBy(p => p.Timestamp);
  var first = pairs .First().Timestamp;
  var wrapped = pairs.Select(p => new { Offset = (p.Timestamp - first), Pair = p });
  var start = DateTime.Now;
  double interval = 250; // 1/4 second
  Timer timer = new Timer(interval);
  timer.AutoReset = true;
  timer.Elapsed += (sender, elapsedArgs) =>
  {
    var signalTime = elapsedArgs.SignalTime;
    var elapsedTime = (signalTime - start);
    var pairsToTrigger = wrapped.TakeWhile(wrap => elapsedTime > wrap.Offset).Select(w => w.Pair);
    wrapped = wrapped.Skip(pairsToTrigger.Count());
    if (!wrapped.Any())
      timer.Stop();
    foreach (var pair in pairsToTrigger)
      pairEvent(pair.Value);    
  };
  timer.Start();
}