Rx Window, Join, GroupJoin?

本文关键字:GroupJoin Join Rx Window | 更新日期: 2023-09-27 18:10:57

我已经生成/测试了两个可观察对象来组合执行单个查询。

一个用户可以拥有多个角色。每当角色id改变时,数据就需要更新。但是,只有当查询处于活动状态(当前有一些控件需要数据)时,数据才应该更新。

在挂起查询时也可能发生角色Id更改。当查询再次激活时,数据也应该加载。

//Tuple has the Id of the current Role and the time that the Id updated
IObservable<Tuple<Guid, DateTime>> idUpdate
//Tuple has the state of the query (true=active or false=suspended)
//and the time the state of the query updated
IObservable<Tuple<bool, DateTime>> queryStateUpdate 

我想创建

//A hot observable that pushes true whenever the query should execute
IObservable<bool> execute 

我把它分成两个可以合并的case,但是我不知道如何创建case observable

  • 情况a)角色Id更新&最后一个状态是Active
  • 情况b)状态更新为有源&&这是角色Id更新后的第一个活动状态

我已经看过视频,李坎贝尔网站,初学者TOC等,但我似乎找不到一个很好的例子,这个rx加入。关于如何创建执行或案例可观察对象有什么想法吗?

Rx Window, Join, GroupJoin?

鉴于所描述的问题-这有点模糊,因为我没有看到实际id (Guid)用于什么,也没有DateTime值-我有以下查询似乎可以解决您的问题:

IObservable<bool> execute =
    idUpdate
        .Publish(_idUpdate =>
            from qsu in queryStateUpdate
            select qsu.Item1
                ? _idUpdate.Select(x => true) 
                : Observable.Empty<bool>())
        .Switch();

我已经用以下idUpdate &queryStateUpdate可见。

var rnd = new Random();
IObservable<Tuple<Guid, DateTime>> idUpdate =
    Observable
        .Generate(
            0,
            n => n < 10000,
            n => n + 1,
            n => Tuple.Create(Guid.NewGuid(), DateTime.Now),
            n => TimeSpan.FromSeconds(rnd.NextDouble() * 0.1));
IObservable<Tuple<bool, DateTime>> queryStateUpdate =
    Observable
        .Generate(
            0,
            n => n < 100,
            n => n + 1,
            n => n % 2 == 0,
            n => TimeSpan.FromSeconds(rnd.NextDouble() * 2.0))
        .StartWith(true)
        .DistinctUntilChanged()
        .Select(b => Tuple.Create(b, DateTime.Now));

如果你能澄清一下你的问题,我可能会提供一个更好的答案来满足你的需要。


编辑:增加了当Id在非活动时发生变化时所需的"replay(1)"行为。

请注意,我已经摆脱了使用DateTime的元组的需要。

IObservable<Guid> idUpdate = ...
IObservable<bool> queryStateUpdate = ...
var replay = new ReplaySubject<Guid>(1);
var disposer = new SerialDisposable();
Func<bool, IObservable<bool>, IObservable<Guid>,
    IObservable<Guid>> getSwitch = (qsu, qsus, iu) =>
{
    if (qsu)
    {
        return replay.Merge(iu);
    }
    else
    {
        replay.Dispose();
        replay = new ReplaySubject<Guid>(1);
        disposer.Disposable = iu.TakeUntil(qsus).Subscribe(replay);
        return Observable.Empty<Guid>();
    }
};
var query =
    queryStateUpdate
        .DistinctUntilChanged()
        .Publish(qsus =>
            idUpdate
            .Publish(ius =>
                qsus
                    .Select(qsu =>
                        getSwitch(qsu, qsus, ius))))
        .Switch();

我读到的问题是说有一个通知idUpdate流,只要设置了queryStateUpdate,就会处理。如果没有设置queryStateUpdate,则通知应该暂停,直到再次设置queryStateUpdate

在这种情况下,连接运算符不能解决您的问题。

我建议你在queryStateUpdate未设置时需要某种形式的缓存,即

List<Tuple<Guid,DateTime>> cache = new List<Tuple<Guid,DateTime>>();
Subject<Tuple<Guid,DateTime>> execute = new Subject<Tuple<Guid,DateTime>>();
idUpdate.Subscribe( x => {
    if (queryStateUpdate.Last().Item1) //might be missing something here with Last, you might need to copy the state out
        exeucte.OnNext(x);
    else
        cache.Add(x);
    });
queryStateUpdate.Subscribe(x=> {
    if (x.Item1)
    {
       //needs threadsafety
       foreach(var x in cache)
           execute.OnNext(x);
      cache.Clear();
    });

感谢Enigmativity和AlSki。使用缓存,我找到了答案。

var execute = new Subject<Guid>();
var cache = new Stack<Guid>();
idUpdate.CombineLatest(queryStateUpdate, (id, qs) => new { id, qs }).Subscribe( anon =>
{
  var id = anon.id;
  var queryState = anon.qs;
  //The roleId updated after the queryState updated
  if (id.Item2 > queryState.Item2)
  {
      //If the queryState is active, call execute
      if (observationState.Item1)
      {
         cache.Clear();
         execute.OnNext(roleId.Item1);
         return;
      }
      //If the id updated and the state is suspended, cache it
      cache.Push(id.Item1);
  }
  //The queryState updated after the roleId
  else if (queryState.Item2 > roleId.Item2)
  {
     //If the queryState is active and a roleId update has been cached, call execute
     if (queryState.Item1 && cache.Count > 0)
     {
         execute.OnNext(cache.Pop());
         cache.Clear();
     }
  }});