如何在不维护状态的情况下获得可观察流中的项目数

本文关键字:观察 项目数 情况下 维护 状态 | 更新日期: 2023-09-27 17:59:23

如何使用Rx习语在任何给定时间点获得该校的学生人数,而不必自己在School类中保持状态?

using System;
using System.Reactive.Subjects;
namespace SchoolManagementSystem
{
    public class School
    {
        private ISubject<Student> _subject = null;
        private int _maxNumberOfSeats;
        private int _numberOfStudentsAdmitted;
        public string Name { get; set; }
        public School(string name, int maxNumberOfSeats)
        {
            Name = name;
            _maxNumberOfSeats = maxNumberOfSeats;
            _numberOfStudentsAdmitted = 0;
            _subject = new ReplaySubject<Student>();
        }
        public void AdmitStudent(Student student)
        {
            try
            {
                if (student == null)
                    throw new ArgumentNullException("student");
                if (_numberOfStudentsAdmitted == _maxNumberOfSeats)
                {
                    _subject.OnCompleted();
                }
                // Obviously can't do this because this will
                // create a kind of dead lock in that it will
                // wait for the _subject to complete, but I am 
                // using the same _subject to issue notifications.
                // _numberOfStudentsAdmitted = _subject.Count().Wait();

                // OR to keep track of state myself
                Interlocked.Increment(ref _numberOfStudentsAdmitted);
                _subject?.OnNext(student);
            }
            catch(Exception ex)
            {
                _subject.OnError(ex);
            }
        }
        public IObservable<Student> Students
        {
            get
            {
                return _subject;
            }
        }
    }
}

还是这与使用Rx设计的组件原理不一致?

这应该是客户端的责任吗(获取计数并在onNext处理程序中执行所有副作用)?可观察器应该简单地充当无状态信号源或门,就像硬件中断例程一样,简单地向CPU发出感兴趣的事情发生了吗?

在这种情况下,我们失去了可观察到的信号完成的标准。那么它应该如何知道何时完成呢?

如何在不维护状态的情况下获得可观察流中的项目数

您可以在_subject序列上使用Count()方法。它本身将创建一个可观察的序列,其中产生的每个值代表_subject中的最新学生总数。

然后,您可以对学生计数值的序列做出反应。Zip()操作在这方面可能很有用,因为当它的任何内部序列完成时,它在完成结果序列方面具有优势,您可以使用TakeWhile强制执行。

结果看起来像这个

Observable.Zip(
    _subject.Select(student => != null ? student ? throw new ArgumentNullException("student")), 
    _subject.Count().TakeWhile(studentCount => studentCount < _maxNumberOfSeats),
    (student, count) => student 
);

AdmitStudent方法体中所要做的只是将任何新学生推到具有_subject?.OnNext(student)的序列中(就像您已经做的那样),但没有额外的逻辑。您也可以对此进行一些修改,以确保一旦达到最大学生数,_subject本身也会完成,但我不确定您的业务规则,所以我将由您决定。

我可以推荐的最后一件事是,如果玩Rx类型的扩展,并看看这个网站,它可以大量使用它们。