Rx for.Net:如何将扫描与Throttle相结合

本文关键字:扫描 Throttle 相结合 for Net Rx | 更新日期: 2023-09-27 18:29:24

我的问题是:对于给定的事件序列,我希望缓存它们的值,直到流中出现暂停。然后,我将批量处理所有缓存的数据,并清除缓存状态。

一个简单的方法是(不是一个工作代码,可能存在一些错误):

struct FlaggedData
{
    public EventData Data { get; set; }
    public bool Reset { get; set; }
}
...
IObservable<EventData> eventsStream = GetStream();
var resetSignal = new Subject<FlaggedData>();
var flaggedDataStream = eventsStream
    .Select(data => new FlaggedData { Data = data })
    .Merge(resetSignal)
    .Scan(
        new List<EventData>(),
        (cache, flaggedData) =>
        {
            if (!flaggedData.Reset())
            {
                cache.Add(flaggedData.Data);
                return cache;
            }
            return new List<EventData>();
        })
    .Throttle(SomePeriodOfTime)
    .Subscribe(batch => 
        {
            resetSignal.OnNext(new FlaggedData { Reset = true});
            ProcessBatch(batch);
        });

因此,在这里,在收到任何要处理的批后,我请求重置缓存。问题是,由于Throttle,缓存中可能有一些数据(或者我相信是这样),在这种情况下会丢失。

我想要的是一些操作,比如:

ScanWithThrottling<TAccumulate, TSource>(
    IObservable<TSource> source,
    Func<TAccumulate, TSource, TAccumulate> aggregate,
    TimeSpan throttlingSpan)

其每次调用其订户的CCD_ 2时返回将重置累积值的可观测值。

当然,我可以写一个我自己的扩展,但问题是是否有某种方法可以使用标准的Rx操作实现同样的效果。

Rx for.Net:如何将扫描与Throttle相结合

我认为这里有一个简单的方法。使用Buffer()来缓冲基于如下节流阀的元素:

var buffered = source.Publish(ps =>        
    ps.Buffer(() => ps.Throttle(SomePeriodOfTime)));

这将缓冲元素,直到出现SomePeriodOfTime的间隙,并将它们作为列表显示。无需担心"重置"方面,而且不会丢失元素。

Publish的使用确保了对Buffer和每个Throttle都可以使用的源事件有一个单独的共享订阅。油门是缓冲器关闭功能,提供一个信号,指示应启动新的缓冲器。

这是一个可测试的版本——我只是在这里转储每个缓冲区的长度,并使用Timestamp添加定时信息,但它是原始缓冲流上的IList<T>。请注意如何将调度程序作为参数提供给基于时间的操作以启用测试。

注意,您将需要nuget包rx测试来运行此示例,以引入rx测试框架并获得TestSchedulerReactiveTest类型:

void Main()
{
    var scenarios = new Scenarios();
    scenarios.Scenario1();
}
public class Scenarios : ReactiveTest
{
    public void Scenario1()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateHotObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(800, 4),
            OnNext(900, 5),
            OnNext(1400, 6),
            OnNext(1600, 7),
            OnNext(1700, 8),
            OnNext(1800, 9));
    var duration = TimeSpan.FromTicks(300);
    var buffered = source.Publish(ps =>        
        ps.Buffer(() => ps.Throttle(duration, scheduler)));
        buffered.Timestamp(scheduler).Subscribe(
            x => Console.WriteLine("Timestamp: {0} Value: {1}",
                 x.Timestamp.Ticks, x.Value.Count()));
        scheduler.Start();
    }
}

本质上,您似乎想要一个缓冲而非丢弃的Throttle

为了解决这个问题,让我们思考一下Throttle是如何工作的:

  1. 当一个值到达时,缓存它(替换以前的任何值)并启动计时器
  2. 如果另一个值在计时器过期之前到达,请取消计时器并转到#1
  3. 否则,当计时器超时时,从缓存中推送该值

您的规格:

  1. 当一个值到达时,将其附加到缓冲区并启动计时器
  2. 如果另一个值在计时器过期之前到达,请取消计时器并转到#1
  3. 否则,当计时器超时时,推送缓冲区并创建一个新的缓冲区

因此:

var throttled = source.Publish(p => 
  from value in p
  // TODO: Add to buffer (side effect)
  from _ in Observable.Timer(period, scheduler).TakeUntil(p)
  select buffer);  // TODO: Create new buffer (side effect + contention)

请注意,如果指定的scheduler引入了并发性,那么假设的buffer的竞争条件和可能的争用。

如果p观察到新的valueTimer已经过去,那么在天真的实现中,在推送OnNext0并创建新buffer之前,最新的value可能泄漏到buffer中。这可能是可以接受的,尽管无论你如何削减同步都是必要的。在我看来,除非你用时间戳扩展通知,或者保证所有操作的线程相关性,否则不可能获得你想要的确切语义。

也许它作为一个好的广义算子是有意义的。对我来说,它似乎足够正交,可以作为Rx基元包括在内,除非我只是在分析中出错。请考虑添加一个工作项。

我还清楚地记得在Rx MSDN论坛上讨论过类似的运营商,可能不止一次。也许值得搜索这些讨论。

编辑:我已经改变了在本例中使用同步的想法,因为无论如何,假设scheduler引入了并发,竞争条件在这里是不可避免的

未测试示例:

public static IObservable<IList<TSource>> ThrottleBuffered<TSource>(
  this IObservable<TSource> source, 
  TimeSpan period,
  IScheduler scheduler)
{
  return source.Publish(p => 
  {
    var buffer = new List<TSource>();
    return (from _ in p.Do(buffer.Add)
            from __ in Observable.Timer(period, scheduler).TakeUntil(p)
            .Do(_ => buffer = new List<TSource>())
            select buffer);
  });
}