对一段时间内未发生不同事件的事件的响应式订阅
本文关键字:事件 响应 一段时间 | 更新日期: 2023-09-27 17:58:30
我正试图弄清楚如何为一个事件创建一个Reactive订阅,而不是在特定的时间窗口内创建一个不同的事件。
为了说明,这里有一个用例:
由两个事件触发的繁忙指示器。忙和不忙。它们可以近距离发射,但指示灯不应经常闪烁。
当NotBusy激发时,应该没有指示器。当Busy启动并且NotBusy在5秒内没有启动时,它应该显示出来。
有没有一种方法可以在不添加外部状态的情况下完全在Reactive中做到这一点?
啊,我忍不住了——你多久会反驳一次源材料的作者(参见Benjol对问题的评论):)
这是我的尝试(LINQPad就绪):
输出如下:
At 1/1/0001 12:00:00 AM +00:00, Busy Signal Indicator is now:OFF
Sending BUSY at 1/1/0001 12:00:00 AM +00:00
Sending NOTBUSY at 1/1/0001 12:00:02 AM +00:00
Sending BUSY at 1/1/0001 12:00:03 AM +00:00
Sending BUSY at 1/1/0001 12:00:06 AM +00:00
At 1/1/0001 12:00:08 AM +00:00, Busy Signal Indicator is now:ON
Sending NOTBUSY at 1/1/0001 12:00:09 AM +00:00
At 1/1/0001 12:00:09 AM +00:00, Busy Signal Indicator is now:OFF
以下是我们活动的基本定义:
enum EventType
{
Busy,
NotBusy
}
class StreamEvent
{
public EventType Type {get; set;}
public StreamEvent(EventType type) { Type = type;}
}
这是查询+测试代码:
void Main()
{
// our simulated event stream
var fakeSource = new System.Reactive.Subjects.Subject<StreamEvent>();
// Let's use a scheduler we actually don't have to wait for
var theTardis = new System.Reactive.Concurrency.HistoricalScheduler();
var busySignal = fakeSource
// Batch up events:
.Window(
// Starting batching on a busy signal
fakeSource.Where(e => e.Type == EventType.Busy),
// Stop batching on a not busy signal
(open) => fakeSource.Where(e => e.Type == EventType.NotBusy)
// but throw a timeout if we exceed 5 seconds per window
.Timeout(TimeSpan.FromSeconds(5), theTardis))
// Unpack the windows
.Switch()
// Catch any timeout exception and inject a NULL into the stream
.Catch(fakeSource.StartWith((StreamEvent)null))
// Bool-ify on "did a timeout happen?"
.Select(e => e == null)
// Start in an "unbusy" state
.StartWith(false)
// And only tell us about transitions
.DistinctUntilChanged();
using(busySignal.Subscribe(signal =>
Console.WriteLine("At {0}, Busy Signal Indicator is now:{1}",
theTardis.Now,
signal ? "ON" : "OFF")))
{
// should not generate a busy signal
Console.WriteLine("Sending BUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.Busy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(2));
Console.WriteLine("Sending NOTBUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.NotBusy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(1));
Console.WriteLine();
// should generate a busy signal
Console.WriteLine("Sending BUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.Busy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(3));
Console.WriteLine("Sending BUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.Busy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(3));
// and this should clear it
Console.WriteLine("Sending NOTBUSY at {0}", theTardis.Now);
fakeSource.OnNext(new StreamEvent(EventType.NotBusy));
theTardis.AdvanceBy(TimeSpan.FromSeconds(1));
Console.WriteLine();
}
}