反应式扩展会缓冲事件,直到请求为止
本文关键字:请求 扩展 缓冲 事件 反应式 | 更新日期: 2023-09-27 18:36:15
我准备了一个演示应用程序。
using System.Collections.Concurrent;
using System.Reactive.Linq;
class Program
{
static void Main(string[] args)
{
var stored = new ConcurrentQueue<long>();
Observable.Interval(TimeSpan.FromMilliseconds(20))
.Subscribe(it => stored.Enqueue(it));
var random = new Random();
Task.Run(async () =>
{
while (true)
{
await Task.Delay((int)(random.NextDouble() * 1000));
var currBatch = stored.ToArray();
for (int i = 0; i < currBatch.Length; i++)
{
long res;
stored.TryDequeue(out res);
}
Console.WriteLine("[" + string.Join(",", currBatch) + "]");
}
});
Console.ReadLine();
}
}
它模拟以随机时间间隔触发的独立使用者。在实际应用中,事件源将来自文件系统,尽管可能是突发的。
这个东西的作用是在并发队列中无限期地存储事件,直到消费者决定使用收集的事件。
我有一种强烈的感觉,这个代码是不安全的。是否有可能以纯粹的 Rx 方式重现这种行为?
如果没有,你能建议更好/更安全的方法吗?
你来了:
var producer = Observable.Interval(TimeSpan.FromMilliseconds(20));
var random = new Random();
Task.Run(async () =>
{
var notify = new Subject<int>();
producer.Window(() => notify)
.SelectMany(ev => ev.ToList())
.Subscribe(currBatch => Console.WriteLine("[" + string.Join(",", currBatch) + "]"));
while (true)
{
await Task.Delay((int)(random.NextDouble() * 1000));
notify.OnNext(1);
}
});
Console.ReadLine();