Rx.Net GroupBy实现很少缺少元素序列
本文关键字:元素 Net GroupBy 实现 Rx | 更新日期: 2023-09-27 18:15:15
我使用的是RX 2.2.5。我有两个视图用
加载子订单 _transportService
.ObserveSubOrder(parentOrder.OrderId)
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Where(subOs => subOs != null)
.Snoop("BeforeGrpBy")
.GroupBy(subOs => subOs.OrderId)
.Subscribe(subOrdUpdates =>
{
AddIfNew(subOrdUpdates.Key, subOrdUpdates.Snoop("AfterGrpBy" + "--" + subOrdUpdates.Key));
})
在groupBy之前它得到了所有的元素序列,在groupBy之后出现的问题是它很少遗漏元素序列。我不认为这是并发问题,因为从日志中可以明显看出。使用自定义Snoop扩展方法生成这些日志。
16:15:44.8169968 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8169968 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8369988 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8379989 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
16:15:44.8379989 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8590010 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained
16:15:44.8600011 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on.
16:15:44.8610012 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed.
16:15:44.8620013 : (1) : AfterGrpBy--9Zsj8Z4sTRb: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125})
格式时间:(线程):Msg
前面可以看到,groupby onNext被调用了两次,但是在它错过了一次之后。这里有什么错误的Rx语法或它是已知的问题?有什么有用的见解吗?如果需要进一步澄清,请评论。
更新:添加工作/需要的日志:
16:15:45.1070258 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1280279 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1310282 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1320283 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1320283 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
16:15:45.1330284 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on.
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed.
16:15:45.1350286 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875})
Update2:可能的bug或特性
GroupBy只在fireNewMapEntry为true时触发groupedoobservable (GroupBy.cs),这发生在这里
if (!_map.TryGetValue(key, out writer))
{
writer = new Subject<TElement>();
_map.Add(key, writer);
fireNewMapEntry = true;
}
其中_map的类型为Dictionary<TKey, ISubject<TElement>>
。这可能是问题所在吗?
只是一些关于你的代码风格的注释(对不起,这不是一个真正的答案,因为我认为@supertopi已经回答了)
-
移动您的
SubscribeOn
和ObserveOn
呼叫是您在最终订阅之前做的最后一件事。在您当前的代码中,您在_uiScheduler
上执行Where
,Snoop
和GroupBy
,占用宝贵的周期。 -
避免在订阅中订阅。看起来
AddIfNew
接受一个密钥和一个IObservable<T>
,因此我假设它在内部执行一些订阅。相反,依靠你所知道的。如果您正在使用GroupBy,那么您知道第一次生成组时Key将是唯一的。所以这现在可以只是一个Add(如果它是你正在检查的键)。如果想要显式,也可以使用Take(1)
。如果你检查的是值而不是键,那么GroupBy
似乎是多余的。 -
尽量保持你的变量名称一致,所以作为另一个开发人员正在阅读查询,他们很好地引导,而不是在
subOs
,childOs
和childUpdates
之间跳转,当childOrder
似乎是一个更好的名字(imo) -
理想情况下,不要在可观察序列中返回空值。它的目的是什么?在某些罕见的情况下,这可能是有意义的,但我经常发现使用null代替
OnCompleted
来表示该序列没有值。
。
_transportService
.ObserveSubOrder(parentOrder.OrderId)
.Where(childOrder => childOrder != null)
.Snoop("BeforeGrpBy")
.GroupBy(childOrder => childOrder.OrderId)
.SelectMany(grp => grp.Take(1).Select(childOrder=>Tuple.Create(grp.key, childOrder))
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Subscribe(newGroup =>
{
Add(newGroup.Item1, newGroup.Item2);
},
ex=>//obviously we have error handling here ;-)
);
或
_transportService
.ObserveSubOrder(parentOrder.OrderId)
.Where(childOrder => childOrder != null)
.Snoop("BeforeGrpBy")
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Subscribe(childOrder =>
{
AddIfNew(childOrder.OrderId, childOrder);
},
ex=>//obviously we have error handling here ;-)
);
甚至更好(没有snoop和null检查)
var subscription = _transportService
.ObserveSubOrder(parentOrder.OrderId)
.SubscribeOn(_backgroundScheduler)
.ObserveOn(_uiScheduler)
.Subscribe(
childOrder => AddIfNew(childOrder.OrderId, childOrder),
ex=>//obviously we have error handling here ;-)
);
hth
你错过了GroupBy
的本质。
操作符只在出现新组后才发出OnNext
(参见实现GroupBy.cs:67)。在您的示例中,两个通知的orderID
等于,因此只发出一个OnNext
。
操作符发出的值是IGroupedObservable<T>
,如果需要访问组内的进一步通知,可以订阅该值。