Rx -组合最新队列
本文关键字:队列 最新 组合 Rx | 更新日期: 2023-09-27 18:12:52
我正在尝试使用Rx实现队列功能(我知道我可以使用队列+锁定完成它,但试图学习+使用Rx,因为我没有很多机会使用它)。
功能是,我想对一个事件采取一些行动,但一次只想处理一个,一旦完成,处理下一个/最后一个(如果它是新的)。
这是我目前所拥有的(使用递增标志可观察对象,所以我可以做DistinctUntilChanged
,但这感觉像一个hack)。
// Some source
var events = new Subject<string>();
events.Subscribe(s => Console.WriteLine("Event: " + s));
// How can I get rid of this
var counter = 0;
var flag = new Subject<int>();
flag.Subscribe(i => Console.WriteLine("Flag: " + i));
var combined = events
.CombineLatest(flag, (s, i) => new {Event = s, Flag = i});
var worker = combined
.DistinctUntilChanged(arg => arg.Flag)
.DistinctUntilChanged(arg => arg.Event);
worker.Subscribe(x => Console.WriteLine("'r'nDo Work: " + x.Event + "'r'n"));
flag.OnNext(counter++); // Ready
events.OnNext("one"); // Idle, Start eight
events.OnNext("two");
events.OnNext("three");
events.OnNext("four");
events.OnNext("five"); // Process
flag.OnNext(counter++); // Finished one, start five
events.OnNext("six");
events.OnNext("seven");
events.OnNext("eight"); // Idle, Start eight
flag.OnNext(counter++); // Finished five, start eight
flag.OnNext(counter++); // Finished eight
events.OnNext("nine"); // Should be processed
如果您运行这段代码,您将看到最后一个事件没有得到处理,即使参与者是空闲的。
感觉我在这里错过了什么....
目前我正在考虑使用observable的observable ....但在过去的几个小时里,我一直在努力弄清楚这个问题:-
<标题>编辑将subject更改为ReplaySubject
var events = new ReplaySubject<string>(1);
并引入了一个变量,但看起来像是另一个hack,但它确实让我摆脱了计数器。
string lastSeen = null;
flag.Select(x => events.Where(s => s != lastSeen).Take(1))
.Subscribe(x =>
x.Subscribe(s =>
{
lastSeen = s;
Console.WriteLine(s);
})
);
如果有人知道一个更好的方法,我可以摆脱string lastSeen
或简化我的嵌套/订阅,这将是伟大的
如果你愿意在TPL数据流块中混合,你可以做你想做的。虽然我必须说这有点不像爱尔兰语。
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var queue = new BroadcastBlock<long>(null);
var subscription = queue.AsObservable().DistinctUntilChanged().Subscribe(l =>
{
Thread.Sleep(2500);
Console.WriteLine(l);
});
source.SubscribeOn(ThreadPoolScheduler.Instance).Subscribe(queue.AsObserver());
BroadcastBlock是只保存最后一个值的队列。TPL数据流块确实对Rx有很好的支持。我们可以订阅事件并把它放入BroadcastBlock然后订阅BroadcastBlock。