如何使用Rx按计划交付事件
本文关键字:交付 事件 按计划 Rx 何使用 | 更新日期: 2023-09-27 18:05:27
我在c#中使用响应式扩展。我想要几个线程在ConcurrentQueue上排队。然后我想订阅那个队列,但每1秒只能得到1个元素。这个答案几乎有效,但当我向队列中添加更多元素时就不行了。
给定一个int型队列:[1,2,3,4,5,6]。我希望Subscribe(Console.WriteLine)每秒打印一个值。当Rx打印这些数字时,我想从另一个线程向队列中添加更多的整型数。什么好主意吗?
要使输入流的输出速度不超过Timespan
间隔所描述的速率,请使用以下命令:
var paced = input.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i)).Concat();
请看这里的解释。下面是一个为快速脱队列的并发队列量身定制的示例实现。请注意,使用IEnumerable<T>
的ToObservable
扩展将ConcurrentQueue<T>
直接转换为可观察对象将是一个错误,因为不幸的是,这个可观察对象一旦队列为空就会完成。很烦人的是——至少在我看来——ConcurrentQueue<T>
上没有异步脱队列,所以我不得不引入一个轮询机制。其他抽象(例如BlockingCollection<T>
)可能更适合您!
public static class ObservableExtensions
{
public static IObservable<T> Pace<T>(this ConcurrentQueue<T> queue,
TimeSpan interval)
{
var source = Observable.Create<T>(async (o, ct) => {
while(!ct.IsCancellationRequested)
{
T next;
while(queue.TryDequeue(out next))
o.OnNext(next);
// You might want to use some arbitrary shorter interval here
// to allow the stream to resume after a long delay in source
// events more promptly
await Task.Delay(interval, ct);
}
ct.ThrowIfCancellationRequested();
});
// this does the pacing
return source.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i)).Concat()
.Publish().RefCount(); // to allow multiple subscribers
}
}
使用例子:
public static void Main()
{
var queue = new ConcurrentQueue<int>();
var stopwatch = new Stopwatch();
queue.Pace(TimeSpan.FromSeconds(1))
.Subscribe(
x => Console.WriteLine(stopwatch.ElapsedMilliseconds + ": x" + x),
e => Console.WriteLine(e.Message),
() => Console.WriteLine("Done"));
stopwatch.Start();
queue.Enqueue(1);
queue.Enqueue(2);
Thread.Sleep(500);
queue.Enqueue(3);
Thread.Sleep(5000);
queue.Enqueue(4);
queue.Enqueue(5);
queue.Enqueue(6);
Console.ReadLine();
}
也许你会对其中一个Observable感到满意。缓冲过载。但是考虑不要对长时间运行的订阅使用缓冲,因为缓冲的元素会给RAM带来压力。
你也可以用Observable来构建你自己的扩展方法。生成
void Main()
{
var queue = new ConcurrentQueue<int>();
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
queue.Enqueue(4);
queue.ObserveEach(TimeSpan.FromSeconds(1)).DumpLive("queue");
}
// Define other methods and classes here
public static class Ex {
public static IObservable<T> ObserveConcurrentQueue<T>(this ConcurrentQueue<T> queue, TimeSpan period)
{
return Observable
.Generate(
queue,
x => true,
x => x,
x => x.DequeueOrDefault(),
x => period)
.Where(x => !x.Equals(default(T)));
}
public static T DequeueOrDefault<T>(this ConcurrentQueue<T> queue)
{
T result;
if (queue.TryDequeue(out result))
return result;
else
return default(T);
}
}