Rx.Net GroupBy实现很少缺少元素序列

本文关键字:元素 Net GroupBy 实现 Rx | 更新日期: 2023-09-27 18:15:15

我使用的是RX 2.2.5。我有两个视图用

加载子订单
           _transportService
            .ObserveSubOrder(parentOrder.OrderId)
            .SubscribeOn(_backgroundScheduler)
            .ObserveOn(_uiScheduler)
            .Where(subOs => subOs != null)                
            .Snoop("BeforeGrpBy")
            .GroupBy(subOs => subOs.OrderId)
            .Subscribe(subOrdUpdates =>
            {
                AddIfNew(subOrdUpdates.Key, subOrdUpdates.Snoop("AfterGrpBy" + "--" + subOrdUpdates.Key));                        
            })

在groupBy之前它得到了所有的元素序列,在groupBy之后出现的问题是它很少遗漏元素序列。我不认为这是并发问题,因为从日志中可以明显看出。使用自定义Snoop扩展方法生成这些日志。

16:15:44.8169968 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8169968 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8369988 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8379989 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8379989 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8590010 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8600011 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8610012 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8620013 : (1) : AfterGrpBy--9Zsj8Z4sTRb: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})

格式时间:(线程):Msg

前面可以看到,groupby onNext被调用了两次,但是在它错过了一次之后。这里有什么错误的Rx语法或它是已知的问题?有什么有用的见解吗?如果需要进一步澄清,请评论。

更新:添加工作/需要的日志:

16:15:45.1070258 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1280279 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1310282 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1320283 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1320283 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1350286 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})

Update2:可能的bug或特性

GroupBy只在fireNewMapEntry为true时触发groupedoobservable (GroupBy.cs),这发生在这里

 if (!_map.TryGetValue(key, out writer))
 {
    writer = new Subject<TElement>();
    _map.Add(key, writer);
    fireNewMapEntry = true;
  }

其中_map的类型为Dictionary<TKey, ISubject<TElement>>。这可能是问题所在吗?

Rx.Net GroupBy实现很少缺少元素序列

只是一些关于你的代码风格的注释(对不起,这不是一个真正的答案,因为我认为@supertopi已经回答了)

  1. 移动您的SubscribeOnObserveOn呼叫是您在最终订阅之前做的最后一件事。在您当前的代码中,您在_uiScheduler上执行Where, SnoopGroupBy,占用宝贵的周期。

  2. 避免在订阅中订阅。看起来AddIfNew接受一个密钥和一个IObservable<T>,因此我假设它在内部执行一些订阅。相反,依靠你所知道的。如果您正在使用GroupBy,那么您知道第一次生成组时Key将是唯一的。所以这现在可以只是一个Add(如果它是你正在检查的键)。如果想要显式,也可以使用Take(1)。如果你检查的是值而不是键,那么GroupBy似乎是多余的。

  3. 尽量保持你的变量名称一致,所以作为另一个开发人员正在阅读查询,他们很好地引导,而不是在subOs, childOschildUpdates之间跳转,当childOrder似乎是一个更好的名字(imo)

  4. 理想情况下,不要在可观察序列中返回空值。它的目的是什么?在某些罕见的情况下,这可能是有意义的,但我经常发现使用null代替OnCompleted来表示该序列没有值。

_transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .Where(childOrder => childOrder != null)                
        .Snoop("BeforeGrpBy")
        .GroupBy(childOrder => childOrder.OrderId)
        .SelectMany(grp => grp.Take(1).Select(childOrder=>Tuple.Create(grp.key, childOrder))
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(newGroup =>
        {
            Add(newGroup.Item1, newGroup.Item2);                        
        },
          ex=>//obviously we have error handling here ;-)
        );

_transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .Where(childOrder => childOrder != null)                
        .Snoop("BeforeGrpBy")
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(childOrder =>
          {
             AddIfNew(childOrder.OrderId, childOrder);                             
          },
          ex=>//obviously we have error handling here ;-)
        );

甚至更好(没有snoop和null检查)

var subscription = _transportService
        .ObserveSubOrder(parentOrder.OrderId)
        .SubscribeOn(_backgroundScheduler)
        .ObserveOn(_uiScheduler)
        .Subscribe(
          childOrder => AddIfNew(childOrder.OrderId, childOrder),
          ex=>//obviously we have error handling here ;-)
        );

hth

你错过了GroupBy的本质。

操作符只在出现新组后才发出OnNext(参见实现GroupBy.cs:67)。在您的示例中,两个通知的orderID等于,因此只发出一个OnNext

操作符发出的值是IGroupedObservable<T>,如果需要访问组内的进一步通知,可以订阅该值。