使用可观察到的.在不活动或计数后执行操作

本文关键字:操作 执行 不活动 观察 | 更新日期: 2023-09-27 18:10:50

我有一个由事件模式创建的可观察流,如下所示。

var keyspaceStream = Observable.FromEventPattern<RedisSubscriptionReceivedEventArgs>(
            h => keyspaceMonitor.KeySpaceChanged += h,
            h => keyspaceMonitor.KeySpaceChanged -= h);

我想做的是订阅流并执行一个方法,当有10秒的不活动(没有事件发生)或100个事件已经触发而没有执行方法。这是为了避免事件每5秒触发一次,而onNext方法从未被调用的情况。

我怎样才能做到这一点?我知道如何做第一部分(见下文),但我不知道如何做计数逻辑。注意,我已经知道如何订阅流。

var throttledStream = keyspaceStream.Throttle(TimeSpan.FromSeconds(10));

任何帮助将非常感激!谢谢你。

使用可观察到的.在不活动或计数后执行操作

Buffer与自定义bufferClosingSelector一起使用。这里的想法是,每个缓冲区应该在maxDurationmaxCount项之后关闭,以较早出现的为准。每关闭一个缓冲区,就会打开一个新的缓冲区。

var maxDuration = TimeSpan.FromSeconds(10);
var maxCount = 100;
var throttledStream = keyspaceStream.Publish(o =>
{
    var reachedMaxDuration = o
        .Select(_ => Observable.Timer(maxDuration, scheduler))
        .Switch();
    return o.Buffer(() => o
        .TakeUntil(reachedMaxDuration)
        .Take(maxCount)
        .LastOrDefaultAsync());
});

我假设您提供了IScheduler schedulerthrottledStream的类型为IObservable<IList<EventPattern<RedisSubscriptionReceivedEventArgs>>>