如何使用Rx按计划交付事件

本文关键字:交付 事件 按计划 Rx 何使用 | 更新日期: 2023-09-27 18:05:27

我在c#中使用响应式扩展。我想要几个线程在ConcurrentQueue上排队。然后我想订阅那个队列,但每1秒只能得到1个元素。这个答案几乎有效,但当我向队列中添加更多元素时就不行了。

给定一个int型队列:[1,2,3,4,5,6]。我希望Subscribe(Console.WriteLine)每秒打印一个值。当Rx打印这些数字时,我想从另一个线程向队列中添加更多的整型数。什么好主意吗?

如何使用Rx按计划交付事件

要使输入流的输出速度不超过Timespan间隔所描述的速率,请使用以下命令:

var paced = input.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

请看这里的解释。下面是一个为快速脱队列的并发队列量身定制的示例实现。请注意,使用IEnumerable<T>ToObservable扩展将ConcurrentQueue<T>直接转换为可观察对象将是一个错误,因为不幸的是,这个可观察对象一旦队列为空就会完成。很烦人的是——至少在我看来——ConcurrentQueue<T>上没有异步脱队列,所以我不得不引入一个轮询机制。其他抽象(例如BlockingCollection<T>)可能更适合您!

public static class ObservableExtensions
{
    public static IObservable<T> Pace<T>(this ConcurrentQueue<T> queue,
                                         TimeSpan interval)
    {
        var source = Observable.Create<T>(async (o, ct) => {
            while(!ct.IsCancellationRequested)
            {
                T next; 
                while(queue.TryDequeue(out next))
                    o.OnNext(next);
                // You might want to use some arbitrary shorter interval here
                // to allow the stream to resume after a long delay in source
                // events more promptly    
                await Task.Delay(interval, ct);
            }   
            ct.ThrowIfCancellationRequested();
        });
        // this does the pacing
        return source.Select(i => Observable.Empty<T>()
                     .Delay(interval)
                     .StartWith(i)).Concat()
                     .Publish().RefCount(); // to allow multiple subscribers    
    }
}

使用例子:

public static void Main()
{
    var queue = new ConcurrentQueue<int>();
    var stopwatch = new Stopwatch();
    queue.Pace(TimeSpan.FromSeconds(1))
        .Subscribe(
            x => Console.WriteLine(stopwatch.ElapsedMilliseconds + ": x" + x),
            e => Console.WriteLine(e.Message),
            () => Console.WriteLine("Done"));
    stopwatch.Start();
    queue.Enqueue(1);
    queue.Enqueue(2);
    Thread.Sleep(500);
    queue.Enqueue(3);
    Thread.Sleep(5000);
    queue.Enqueue(4);
    queue.Enqueue(5);
    queue.Enqueue(6);
    Console.ReadLine();
}

也许你会对其中一个Observable感到满意。缓冲过载。但是考虑不要对长时间运行的订阅使用缓冲,因为缓冲的元素会给RAM带来压力。

你也可以用Observable来构建你自己的扩展方法。生成

void Main()
{
    var queue = new ConcurrentQueue<int>();
    queue.Enqueue(1);
    queue.Enqueue(2);
    queue.Enqueue(3);
    queue.Enqueue(4);
    queue.ObserveEach(TimeSpan.FromSeconds(1)).DumpLive("queue");
}
// Define other methods and classes here
public static class Ex {
    public static IObservable<T> ObserveConcurrentQueue<T>(this ConcurrentQueue<T> queue, TimeSpan period) 
    {
        return Observable
            .Generate(
                queue, 
                x => true,
                x => x, 
                x => x.DequeueOrDefault(), 
                x => period)
            .Where(x => !x.Equals(default(T)));
    }
    public static T DequeueOrDefault<T>(this ConcurrentQueue<T> queue)
    {
        T result;
        if (queue.TryDequeue(out result))
            return result;
        else
            return default(T);
    }
}