创建一个依赖于间隔状态的可观察对象
本文关键字:状态 对象 观察 依赖于 一个 创建 | 更新日期: 2023-09-27 17:58:28
我正在尝试编写一个类,它将每隔x秒从数据库中查询一组数据。
问题是,这个可观测对象会记住它以前的状态,而这个状态用于确定可观测对象上实际发送的内容。
例如,我想要一个
IObservable<SomeEntity> AddedEntities { get; private set; }
IObservable<SomeEntity> ChangedEntities { get; private set; }
IObservable<SomeEntity> DeletedEntities { get; private set; }
我的问题是我一直在阅读http://www.introtorx.com/他们说使用Subject是个坏主意。相反,我应该使用Observable.Create方法。
我的问题是,我只想查询数据库一次,然后将相关信息发送回正确的可观察对象,但我不希望每个订阅都能启动我自己的定时器,并在每次订阅每个IOobservable时每5秒查询一次数据库。
我只做了一个可观察的东西,它会返回一个新的模型,并将其作为一个集合进行更改:
IObservable<EntityChangeSet> Changes {get; private set;}
public class EntityChangeSet
{
Public IEnumerable<SomeEntity> Added {get; set;}
Public IEnumerable<SomeEntity> Changed {get; set;}
Public IEnumerable<SomeEntity> Deleted {get; set;}
}
我也对任何类似的解决方案持开放态度。
到目前为止,我拥有的是:
public class IntervalChangeReader
{
// My state between ticks.
private IEnumerable<SomeEntity> knowEntities;
// Reads the data from the db and uses knowEntities to determine adds, changes,
// and deletes which are exposed through properties.
private DbReaderAndChangeChecker checker;
IDisposeable timerCancel;
public IntervalChangeReader(DbReaderAndChangeChecker checker)
{
this.checker = checker;
}
public IObservable<EntityChangeSet> Changes { get; private set; }
public Start(int seconds)
{
this.timerCancel = Observable.Interval(new TimeSpan.FromSeconds(seconds)).Subscribe(
x =>
{
var results = this.checker.Refresh(this.knownEntities);
// Update this.knownEntities with results.
// Inform produce a value on the observable ????
// I could call .OnNext If I had a subject exposed for my Observable.
}
}
public Stop()
{
this.timerCancel.Dispose();
// Complete on all subscriptions?
// If I were using subjects I could just call .OnComplete here.
}
}
我如何在不使用主题的情况下创建Observable,以及如何通过它传递结果?我只想用一个定时器来完成这一切,而不是每次订阅我的观察器都有一个定时器。
我相信你想要的是ConnectableObservable
方法。。。玩这个(注意,在电话上,所以我的语法可能有点偏离)
public void Start(int seconds)
{
// Every tick, generate a "changeset"
var changeCentral =
from tick in Observable.Interval(TimeSpan.FromSeconds(seconds))
let results = this.checker.Refresh(this.knowEntities)
select new EntityChangeSet(results);
// Publish means one subscription for all "connecting" subscribers
// RefCount means so long as one subscriber is subscribed, the subscription remains alive
var connector = changeCentral.Publish().RefCount();
Changes = connector;
}
我想我已经找到了解决问题的方法。
public class IntervalChangeReader
{
// My state between ticks.
private IEnumerable<SomeEntity> knowEntities;
// Reads the data from the db and uses knowEntities to determine adds, changes,
// and deletes which are exposed through properties.
private DbReaderAndChangeChecker checker;
private EntityChangeSet lastChangeResult;
public IntervalChangeReader(DbReaderAndChangeChecker checker)
{
this.checker = checker;
// I like to use Do instead of just combining this all into Select to make the side effect obvious.
this.Changes = Observable.Interval(TimeSpan.FromSeconds(seconds), scheduler).Do(
x =>
{
var changes = DbReaderAndChangeChecker.Refresh(this.knownXRefs);
// Make changes to knownXRefs which is my state.
this.knownXRefs.AddRange(changes.Added);
this.knownXRefs.RemoveAll(y => changes.Removed.Any(z => z.Id == y.Id));
this.knownXRefs.RemoveAll(y => changes.Changed.Any(z => z.Id == y.Id));
this.knownXRefs.AddRange(changes.Changed);
lastChangeResult = changes;
}).Select(x => this.lastChangeResult);
}
public IObservable<EntityChangeSet> Changes { get; private set; }
}