反应式扩展会缓冲事件,直到请求为止

本文关键字:请求 扩展 缓冲 事件 反应式 | 更新日期: 2023-09-27 18:36:15

我准备了一个演示应用程序。

using System.Collections.Concurrent;
using System.Reactive.Linq;
class Program
{
    static void Main(string[] args)
    {
        var stored = new ConcurrentQueue<long>();
        Observable.Interval(TimeSpan.FromMilliseconds(20))
            .Subscribe(it => stored.Enqueue(it));
        var random = new Random();
        Task.Run(async () =>
        {
            while (true)
            {
                await Task.Delay((int)(random.NextDouble() * 1000));
                var currBatch = stored.ToArray();
                for (int i = 0; i < currBatch.Length; i++)
                {
                    long res;
                    stored.TryDequeue(out res);
                }
                Console.WriteLine("[" + string.Join(",", currBatch) + "]");
            }
        });
        Console.ReadLine();
    }
}

它模拟以随机时间间隔触发的独立使用者。在实际应用中,事件源将来自文件系统,尽管可能是突发的。

这个东西的作用是在并发队列中无限期地存储事件,直到消费者决定使用收集的事件。

我有一种强烈的感觉,这个代码是不安全的。是否有可能以纯粹的 Rx 方式重现这种行为?

如果没有,你能建议更好/更安全的方法吗?

反应式扩展会缓冲事件,直到请求为止

你来了:

var producer = Observable.Interval(TimeSpan.FromMilliseconds(20));
var random = new Random();
Task.Run(async () =>
{
    var notify = new Subject<int>();
    producer.Window(() => notify)
        .SelectMany(ev => ev.ToList())
        .Subscribe(currBatch => Console.WriteLine("[" + string.Join(",", currBatch) + "]"));
    while (true)
    {
        await Task.Delay((int)(random.NextDouble() * 1000));
        notify.OnNext(1);
    }
});
Console.ReadLine();