使用 Rx,如何在我的订阅方法运行时忽略除最新值之外的所有值

本文关键字:最新 运行时 Rx 我的 方法 使用 | 更新日期: 2023-09-27 18:37:11

使用反应式扩展,我想忽略来自我的事件流的消息,这些消息在我的Subscribe方法运行时发生。 也就是说,有时我处理消息所需的时间比消息之间的时间要长,因此我想丢弃我没有时间处理的消息。

但是,当我的Subscribe方法完成时,如果有任何消息通过,我想处理最后一个消息。所以我总是处理最新的消息。

所以,如果我有一些代码可以:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

如果我们假设"100"需要很长时间来处理。然后,我希望在"2"完成时处理"100"。应忽略"1",因为在"100"仍在处理时,它已被"2"取代。

下面是我希望使用后台任务的结果示例,Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

但是,Latest() 是一个阻塞调用,我不希望有一个线程坐在那里等待这样的下一个值(消息之间有时会有很长的间隔)。

我还可以使用 TPL 数据流中的BroadcastBlock获得我想要的结果,如下所示:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

但这感觉应该可以直接在 Rx 中实现。最好的方法是什么?

使用 Rx,如何在我的订阅方法运行时忽略除最新值之外的所有值

这是一个类似于 Dave 的方法,但改用Sample(这比缓冲区更合适)。我包含一个与我添加到 Dave 答案中的扩展方法类似的扩展方法。

扩展名:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var sampler = new Subject<Unit>();
    var sub = source.
        Sample(sampler).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l);
            sampler.OnNext(Unit.Default);
        });
    // start sampling when we have a first value
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));
    return sub;
}

请注意,它更简单,并且没有触发的"空"缓冲区。发送到操作的第一个元素实际上来自流本身。

用法很简单:

messages.SubscribeWithoutOverlap(n =>
{
    Console.WriteLine("start: " + n);
    Thread.Sleep(500);
    Console.WriteLine("end: " + n);
});
messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing

结果:

source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10

感谢 Lee Campbell(Intro To Rx 成名),我现在有一个使用此扩展方法的工作解决方案:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }
            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}

这是使用"just"Rx的尝试。计时器和订阅者通过观察线程池保持独立,我使用主题来提供完成任务的反馈。

我不认为这是一个简单的解决方案,但我希望它能给你一些改进的想法。

messages.
    Buffer(() => feedback).
    Select(l => l.LastOrDefault()).
    ObserveOn(Scheduler.ThreadPool).
    Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
        feedback.OnNext(Unit.Default);
    });
feedback.OnNext(Unit.Default);

有一个小问题 - 缓冲区在空时首先关闭,因此它会生成默认值。您可以通过在第一条消息之后进行反馈来解决它。


这里是作为扩展函数:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var feedback = new Subject<Unit>();
    var sub = source.
        Buffer(() => feedback).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l.LastOrDefault());
            feedback.OnNext(Unit.Default);
        });
    feedback.OnNext(Unit.Default);
    return sub;
}

和用法:

    messages.SubscribeWithoutOverlap(n =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(n);
    });

使用 Observable.Switch 的示例。它还处理当您完成任务但什么都没有时的情况在队列中。

using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace System.Reactive
{
    public static class RXX
    {
        public static IDisposable SubscribeWithoutOverlap<T>
        ( this IObservable<T> source
        , Action<T> action
        , IScheduler scheduler = null)
        {
            var sampler = new Subject<Unit>();
            scheduler = scheduler ?? Scheduler.Default;
            var p = source.Publish();
            var connection = p.Connect();
            var subscription = sampler.Select(x=>p.Take(1))
                .Switch()
                .ObserveOn(scheduler)
                .Subscribe(l =>
                {
                    action(l);
                    sampler.OnNext(Unit.Default);
                });
            sampler.OnNext(Unit.Default);
            return new CompositeDisposable(connection, subscription);
        }
    }
}

我写了一篇关于这个问题的博客文章,其中包含一个使用 CAS 而不是锁并避免递归的解决方案。代码如下,但您可以在此处找到完整的解释:http://www.zerobugbuild.com/?p=192

public static IObservable<TSource> ObserveLatestOn<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler)
{
    return Observable.Create<TSource>(observer =>
    {
        Notification<TSource> pendingNotification = null;
        var cancelable = new MultipleAssignmentDisposable();
        var sourceSubscription = source.Materialize()
            .Subscribe(notification =>
            {
                var previousNotification = Interlocked.Exchange(
                    ref pendingNotification, notification);
                if (previousNotification != null) return;
                cancelable.Disposable = scheduler.Schedule(() =>
                    {
                        var notificationToSend = Interlocked.Exchange(
                            ref pendingNotification, null);
                        notificationToSend.Accept(observer);
                    });
            });
            return new CompositeDisposable(sourceSubscription, cancelable);
    });
}

这是一个基于 Task 的实现,具有取消语义,不使用主题。如果需要,调用 dispose 允许订阅的操作取消处理。

    public static IDisposable SampleSubscribe<T>(this IObservable<T> observable, Action<T, CancellationToken> action)
    {
        var cancellation = new CancellationDisposable();
        var token = cancellation.Token;
        Task task = null;
        return new CompositeDisposable(
            cancellation,
            observable.Subscribe(value =>
            {
                if (task == null || task.IsCompleted)
                    task = Task.Factory.StartNew(() => action(value, token), token);
            })
        );
    }

这是一个简单的测试:

Observable.Interval(TimeSpan.FromMilliseconds(150))
                      .SampleSubscribe((v, ct) =>
                      {   
                          //cbeck for cancellation, do work
                          for (int i = 0; i < 10 && !ct.IsCancellationRequested; i++)
                              Thread.Sleep(100);
                          Console.WriteLine(v);
                      });

输出:

0
7
14
21
28
35

使用 Rx 2.0 RC,您可以使用 Chunkify 获取 IE无数列表,每个列表都包含自上次 MoveNext 以来观察到的内容。

然后,您可以使用ToObservable将其转换回 IObservable,并且只注意每个非空列表中的最后一个条目。

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
messages.Chunkify()
        .ToObservable(Scheduler.TaskPool)
        .Where(list => list.Any())
        .Select(list => list.Last())
        .Subscribe(n =>
        {
          Thread.Sleep(TimeSpan.FromMilliseconds(250));
          Console.WriteLine(n);
        });

刚刚完成(并且已经完全修改了)我自己的问题解决方案,我计划在生产中使用。

除非调度程序使用当前线程,否则从源调用OnNextOnCompletedOnError应立即返回;如果观察者忙于处理以前的通知,它们将进入具有可指定最大大小的队列,每当处理了以前的通知时,他们都会收到通知。如果队列已满,则丢弃最近的项目。因此,最大队列大小为 0 将忽略观察者繁忙时传入的所有项目;大小为 1 将始终允许观察最新项目;最大int.MaxValue的尺寸使消费者忙碌,直到赶上生产者。

如果调度程序支持长时间运行(即给你一个自己的线程),我调度一个循环来通知观察者;否则我使用递归调度。

这是代码。任何意见不胜感激。

partial class MoreObservables
{
    /// <summary>
    /// Avoids backpressure by enqueuing items when the <paramref name="source"/> produces them more rapidly than the observer can process.
    /// </summary>
    /// <param name="source">The source sequence.</param>
    /// <param name="maxQueueSize">Maximum queue size. If the queue gets full, less recent items are discarded from the queue.</param>
    /// <param name="scheduler">Optional, default: <see cref="Scheduler.Default"/>: <see cref="IScheduler"/> on which to observe notifications.</param>
    /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
    /// <exception cref="ArgumentOutOfRangeException"><paramref name="maxQueueSize"/> is negative.</exception>
    /// <remarks>
    /// A <paramref name="maxQueueSize"/> of 0 observes items only if the subscriber is ready.
    /// A <paramref name="maxQueueSize"/> of 1 guarantees to observe the last item in the sequence, if any.
    /// To observe the whole source sequence, specify <see cref="int.MaxValue"/>.
    /// </remarks>
    public static IObservable<TSource> Latest<TSource>(this IObservable<TSource> source, int maxQueueSize, IScheduler scheduler = null)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (maxQueueSize < 0) throw new ArgumentOutOfRangeException(nameof(maxQueueSize));
        if (scheduler == null) scheduler = Scheduler.Default;
        return Observable.Create<TSource>(observer => LatestImpl<TSource>.Subscribe(source, maxQueueSize, scheduler, observer));
    }
    private static class LatestImpl<TSource>
    {
        public static IDisposable Subscribe(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
        {
            if (observer == null) throw new ArgumentNullException(nameof(observer));
            var longrunningScheduler = scheduler.AsLongRunning();
            if (longrunningScheduler != null)
                return new LoopSubscription(source, maxQueueSize, longrunningScheduler, observer);
            return new RecursiveSubscription(source, maxQueueSize, scheduler, observer);
        }
        #region Subscriptions
        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies in a loop.
        /// </summary>
        private sealed class LoopSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Head, // next notification is in _head
                Queue, // next notifications are in _queue, followed by _completion
                Disposed, // disposed
            }
            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly IObserver<TSource> _observer;
            private State _state;
            private TSource _head; // item in front of the queue
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification
            public LoopSubscription(IObservable<TSource> source, int maxQueueSize, ISchedulerLongRunning scheduler, IObserver<TSource> observer)
            {
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                scheduler.ScheduleLongRunning(_ => Loop());
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }
            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _head = value;
                            _state = State.Head;
                            Monitor.Pulse(_subscription);
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }
            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _state = State.Queue;
                            Monitor.Pulse(_subscription);
                            _subscription.Dispose();
                            break;
                        case State.Head:
                        case State.Queue:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }
            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;
                    _head = default(TSource);
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    Monitor.Pulse(_subscription);
                    _subscription.Dispose();
                }
            }
            private void Loop()
            {
                try
                {
                    while (true) // overall loop for all notifications
                    {
                        // next notification to emit
                        Notification<TSource> completion;
                        TSource next; // iff completion == null
                        lock (_subscription)
                        {
                            while (true)
                            {
                                while (_state == State.Idle)
                                    Monitor.Wait(_subscription);
                                if (_state == State.Head)
                                {
                                    completion = null;
                                    next = _head;
                                    _head = default(TSource);
                                    _state = State.Queue;
                                    break;
                                }
                                if (_state == State.Queue)
                                {
                                    if (!_queue.IsEmpty)
                                    {
                                        completion = null;
                                        next = _queue.Dequeue(); // assumption: this never throws
                                        break;
                                    }
                                    if (_completion != null)
                                    {
                                        completion = _completion;
                                        next = default(TSource);
                                        break;
                                    }
                                    _state = State.Idle;
                                    continue;
                                }
                                Debug.Assert(_state == State.Disposed);
                                return;
                            }
                        }
                        if (completion != null)
                        {
                            completion.Accept(_observer);
                            return;
                        }
                        _observer.OnNext(next);
                    }
                }
                finally { Dispose(); }
            }
        }
        /// <summary>
        /// Represents a subscription to <see cref="Latest{TSource}(IObservable{TSource}, int, IScheduler)"/> which notifies recursively.
        /// </summary>
        private sealed class RecursiveSubscription : IDisposable
        {
            private enum State
            {
                Idle, // nothing to notify
                Scheduled, // emitter scheduled or executing
                Disposed, // disposed
            }
            private readonly SingleAssignmentDisposable _subscription = new SingleAssignmentDisposable();
            private readonly MultipleAssignmentDisposable _emitter = new MultipleAssignmentDisposable(); // scheduled emit action
            private readonly IScheduler _scheduler;
            private readonly IObserver<TSource> _observer;
            private State _state;
            private IQueue _queue; // queued items
            private Notification<TSource> _completion; // completion notification
            public RecursiveSubscription(IObservable<TSource> source, int maxQueueSize, IScheduler scheduler, IObserver<TSource> observer)
            {
                _scheduler = scheduler;
                _observer = observer;
                _queue = Queue.Create(maxQueueSize);
                _subscription.Disposable = source.Subscribe(
                    OnNext,
                    error => OnCompletion(Notification.CreateOnError<TSource>(error)),
                    () => OnCompletion(Notification.CreateOnCompleted<TSource>()));
            }
            private void OnNext(TSource value)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _emitter.Disposable = _scheduler.Schedule(value, EmitNext);
                            _state = State.Scheduled;
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            try { _queue.Enqueue(value); }
                            catch (Exception error) // probably OutOfMemoryException
                            {
                                _completion = Notification.CreateOnError<TSource>(error);
                                _subscription.Dispose();
                            }
                            break;
                    }
                }
            }
            private void OnCompletion(Notification<TSource> completion)
            {
                lock (_subscription)
                {
                    switch (_state)
                    {
                        case State.Idle:
                            _completion = completion;
                            _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(completion));
                            _state = State.Scheduled;
                            _subscription.Dispose();
                            break;
                        case State.Scheduled:
                            if (_completion != null) return;
                            _completion = completion;
                            _subscription.Dispose();
                            break;
                    }
                }
            }
            public void Dispose()
            {
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;
                    _emitter.Dispose();
                    _queue = null;
                    _completion = null;
                    _state = State.Disposed;
                    _subscription.Dispose();
                }
            }
            private void EmitNext(TSource value, Action<TSource> self)
            {
                try { _observer.OnNext(value); }
                catch { Dispose(); return; }
                lock (_subscription)
                {
                    if (_state == State.Disposed) return;
                    Debug.Assert(_state == State.Scheduled);
                    if (!_queue.IsEmpty)
                        self(_queue.Dequeue());
                    else if (_completion != null)
                        _emitter.Disposable = _scheduler.Schedule(() => EmitCompletion(_completion));
                    else
                        _state = State.Idle;
                }
            }
            private void EmitCompletion(Notification<TSource> completion)
            {
                try { completion.Accept(_observer); }
                finally { Dispose(); }
            }
        }
        #endregion
        #region IQueue
        /// <summary>
        /// FIFO queue that discards least recent items if size limit is reached.
        /// </summary>
        private interface IQueue
        {
            bool IsEmpty { get; }
            void Enqueue(TSource item);
            TSource Dequeue();
        }
        /// <summary>
        /// <see cref="IQueue"/> implementations.
        /// </summary>
        private static class Queue
        {
            public static IQueue Create(int maxSize)
            {
                switch (maxSize)
                {
                    case 0: return Zero.Instance;
                    case 1: return new One();
                    default: return new Many(maxSize);
                }
            }
            private sealed class Zero : IQueue
            {
                // ReSharper disable once StaticMemberInGenericType
                public static Zero Instance { get; } = new Zero();
                private Zero() { }
                public bool IsEmpty => true;
                public void Enqueue(TSource item) { }
                public TSource Dequeue() { throw new InvalidOperationException(); }
            }
            private sealed class One : IQueue
            {
                private TSource _item;
                public bool IsEmpty { get; private set; } = true;
                public void Enqueue(TSource item)
                {
                    _item = item;
                    IsEmpty = false;
                }
                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();
                    var item = _item;
                    _item = default(TSource);
                    IsEmpty = true;
                    return item;
                }
            }
            private sealed class Many : IQueue
            {
                private readonly int _maxSize, _initialSize;
                private int _deq, _enq; // indices of deque and enqueu positions
                private TSource[] _buffer;
                public Many(int maxSize)
                {
                    if (maxSize < 2) throw new ArgumentOutOfRangeException(nameof(maxSize));
                    _maxSize = maxSize;
                    if (maxSize == int.MaxValue)
                        _initialSize = 4;
                    else
                    {
                        // choose an initial size that won't get us too close to maxSize when doubling
                        _initialSize = maxSize;
                        while (_initialSize >= 7)
                            _initialSize = (_initialSize + 1) / 2;
                    }
                }
                public bool IsEmpty { get; private set; } = true;
                public void Enqueue(TSource item)
                {
                    if (IsEmpty)
                    {
                        if (_buffer == null) _buffer = new TSource[_initialSize];
                        _buffer[0] = item;
                        _deq = 0;
                        _enq = 1;
                        IsEmpty = false;
                        return;
                    }
                    if (_deq == _enq) // full
                    {
                        if (_buffer.Length == _maxSize) // overwrite least recent
                        {
                            _buffer[_enq] = item;
                            if (++_enq == _buffer.Length) _enq = 0;
                            _deq = _enq;
                            return;
                        }
                        // increse buffer size
                        var newSize = _buffer.Length >= _maxSize / 2 ? _maxSize : 2 * _buffer.Length;
                        var newBuffer = new TSource[newSize];
                        var count = _buffer.Length - _deq;
                        Array.Copy(_buffer, _deq, newBuffer, 0, count);
                        Array.Copy(_buffer, 0, newBuffer, count, _deq);
                        _deq = 0;
                        _enq = _buffer.Length;
                        _buffer = newBuffer;
                    }
                    _buffer[_enq] = item;
                    if (++_enq == _buffer.Length) _enq = 0;
                }
                public TSource Dequeue()
                {
                    if (IsEmpty) throw new InvalidOperationException();
                    var result = ReadAndClear(ref _buffer[_deq]);
                    if (++_deq == _buffer.Length) _deq = 0;
                    if (_deq == _enq)
                    {
                        IsEmpty = true;
                        if (_buffer.Length > _initialSize) _buffer = null;
                    }
                    return result;
                }
                private static TSource ReadAndClear(ref TSource item)
                {
                    var result = item;
                    item = default(TSource);
                    return result;
                }
            }
        }
        #endregion
    }
}

另一种解决方案。

这并不漂亮,因为它混合了TaskObservable,因此无法使用ReactiveTest进行真正测试(尽管说实话,我不确定如何实现"慢"订阅ReactiveTest者)。

public static IObservable<T> ShedLoad<T>(this IObservable<T> source)
{
    return Observable.Create<T>(observer =>
    {
        Task task = Task.FromResult(0);
        return source.Subscribe(t =>
        {
            if(task.IsCompleted)
                task = Task.Run(() => observer.OnNext(t));
            else
                Debug.WriteLine("Skip, task not finished");
        }, observer.OnError, observer.OnCompleted);
    });
}

我猜那里可能存在竞争条件,但在我看来,如果我们正处于因为发展太快而放弃东西的阶段,我不介意放弃太多或太少。哦,每个OnNext(可能)在不同的线程上调用(我想我可以在Create的背面放一个Synchronize)。

承认我无法让 Materialize 扩展正常工作(我将其连接到FromEventPattern(MouseMove)然后订阅了一个故意变慢的订阅,奇怪的是它会让突发事件通过,而不是一次一个)