有没有办法重置observable . window的windowcloseselect函数?

本文关键字:windowcloseselect 函数 window observable 有没有 | 更新日期: 2023-09-27 18:02:55

我的目标是为"Type2"对象每5秒调用一个更新处理程序不超过一次。这个可观察对象每5秒会生成不止一个值,但是我想忽略在最后一次处理的更新后5秒内发生的所有值。

我在这里问了这个问题:仅在满足特定条件时节流

并得到了很好的反馈。它引导我使用Observable。窗口努力实现我的目标。我以为我有它的工作,但事实证明,它可以产生不正确的输出,如果第一次更新来之前,一个窗口关闭(所以更新处理),然后一旦下一个窗口打开,另一个更新到达,也处理,当我不希望它,因为它来在5秒内最后处理的更新。

下面是一些代码来演示这个问题,稍微修改了链接中的代码:

var source = new Subject<Thing>();    
var feed = source.Publish().RefCount();
var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
    .Where(t => t.ActivationType == "Type2")
    .Window(() =>
            Observable.Timer(TimeSpan.FromSeconds(5))
            .Do(t => Console.WriteLine("'nTICK: " + DateTime.Now.ToString("hh:mm:ss:fff"))))
    .Select(x => x.Take(1))
    .Merge()
    .Do(t => Console.WriteLine("A new window opened " + DateTime.Now.ToString("hh:mm:ss:fff")));

var query = ofType1.Merge(ofType2);        
query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));    
int msDelay = 3000;
Task task = Task.Factory
    .StartNew(() => { Thread.Sleep(msDelay); })
    .ContinueWith((Task starter) =>
        {
            while (running)
            {
                var thing = new Thing();  //Note that all Things are by default Type2
                source.OnNext(thing);
                Thread.Sleep(100);
            }
        }, TaskContinuationOptions.LongRunning);
Console.ReadLine();

所以,订阅完成了,一旦订阅完成,Observable。窗口中使用的计时器开始。用于生成值的while循环直到延迟3000 ms后才开始。

输出如下:

A new window opened 03:48:03:725
UPDATE: 1ac54fb3-f73d-4840-b4d8-95d4250ce65d 03:48:03:752
TICK: 03:48:05:714
A new window opened 03:48:05:754
UPDATE: 12d36e53-010f-4ccd-b9f8-2951b085f88c 03:48:05:754
TICK: 03:48:10:730
A new window opened 03:48:10:755
UPDATE: 25d84e72-94f9-4f50-83f4-14c1004c10fa 03:48:10:755
TICK: 03:48:15:738
A new window opened 03:48:15:755
UPDATE: 5f32b7d5-196f-445c-bf25-5c362b2fd6f0 03:48:15:755
TICK: 03:48:20:747
A new window opened 03:48:20:756
UPDATE: e3a3a30d-8031-41b5-b115-499dbe91aaf7 03:48:20:756
TICK: 03:48:25:755
A new window opened 03:48:25:756
UPDATE: 239fb25b-5135-463b-bf7e-5728ffa07f5c 03:48:25:756
如您所见,第一个Type2更新是在窗口打开时进入的,因此它得到了处理。然后,2秒后,窗口的计时器滴答作响,并打开一个新窗口。它立即处理下一个Type2更新,这是我不希望它做的。之后,它看起来正常工作(每5秒更新一次,在窗口声明中定义)。

是否有一种方法或另一种方法,我可以使用,以确保只有一个更新每5秒(或任何我选择的时间框架)被处理?

有没有办法重置observable . window的windowcloseselect函数?

我想我有一个解决方案,但我能先提一些建议吗?我认为这个问题中有很多杂音,很难发现真正的问题是什么。

实际上你是在问"我如何才能得到一个值,然后至少沉默5秒"。Type1代码是一种干扰。序列的生成也让人分心。

那么让我们清理示例代码,看看我们是否可以看到树木的木材:

首先,我不认为这是完全相关的类型是通过。在您的示例中,我们从不推入Type1,所以我们只使用整数代替。这可能会使它更容易。

接下来,我们可以通过使用一个Observable来清理创建。定时器代替大循环+任务+线程。睡眠的东西。

现在我们有一个简单的起始位置:

var source = Observable.Timer(TimeSpan.FromSeconds(3),TimeSpan.FromMilliseconds(300), Scheduler.TaskPool);   
var feed = source.Publish().RefCount();

所以我们的第一个问题是对你正在使用的窗口重载的误解。我想你的期望是,当第一个值被推入时,它将打开一个窗口。事实并非如此。计时器(即Observable.Timer(TimeSpan.FromSeconds(5)))在每次窗口打开时被订阅,这是最初的订阅发生时,然后在窗口本身关闭时再次订阅。因此,计时器立即启动,您只会在第一个窗口获得2秒的值。

接下来,我要画出我的问题空间。我最喜欢的方法是用大理石图表。它们在ASCII中翻译得不太好,但让我们试一试。

给定这个输入序列:

//Seconds             1111111111222222222233333333334444444444555555555566666666667777777777888888888899999999990000000000
//Tenths    01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
//
//source  : ------------------------------0--1--2--3--4--5--6--7--8--9--0--1--2--3--4--5--6--7--8--9--0--1--2--3--4--5--6--7--8--9--0

这应该表示3.0秒产生的值'0'。然后是3.3秒的"1"等等。

现在有了一点清晰的问题空间,我们可以画出我们认为窗口应该在哪里打开,应该在哪里关闭,以及下一个窗口应该在哪里打开。

让我们看看我们想要什么。

这里我们添加了一个Window1 (W1),当第一个值被按下('1' @3.0)时打开。5秒后关闭。在这个窗口中,我们希望在第一个值1之后保持沉默。

窗口2 (W2)应该在产生下一个值时打开,而不是在最后一个窗口关闭后立即打开(我认为?!)。在这里,我们看到当值'17'在8.4秒的时候被打开。

//Seconds             1111111111222222222233333333334444444444555555555566666666667777777777888888888899999999990000000000
//Tenths    01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
//source  : ------------------------------0--1--2--3--4--5--6--7--8--9--1--1--1--1--1--1--1--1--1--1--2--2--2--2--2--2--2--2--2--2--3--3--3--3--3--3--3--3--3--3--4--4--4--4-
                                                                        0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1  2  3  4  5  6  7  8  9  0  1  2  3
//
//W1      :                               0-------------------------------------------------|
//W2      :                                                                                  (17)----------------------------------------------|
//W3      :                                                                                                                                     (34)------------------------>
//expected: ------------------------------0--------------------------------------------------1--------------------------------------------------3---------------------------
                                                                                             7                                                  4

现在我们知道了要查找的值,就可以构造查询了。

我想出了这个。我假设提要实际上是一个Hot序列。使用这个假设,我构造了一个重复的结构,它从提要中获取1个值,并将5秒的沉默连接到序列中。然后我添加Repeat操作符,它只会在5秒静默过后重新订阅提要。

public static IObservable<T> Silencer<T>(this IObservable<T> source, TimeSpan minSilencePeriod)
{
    return source.Take(1)
                 .Concat(Observable.Empty<T>().Delay(minSilencePeriod))
                 .Repeat();
}

这将生成0、17、51等值。

现在将其应用到原始问题的代码中(清理一些东西)

void Main()
{
    var source = Observable.Timer(TimeSpan.FromSeconds(3),TimeSpan.FromMilliseconds(300), Scheduler.TaskPool).Select(_=>new Thing());   
    var feed = source.Publish().RefCount();
    var ofType1 = feed.Where(t => t.ActivationType == "Type1");
    var ofType2 = feed
            .Where(t => t.ActivationType == "Type2")
            .Silencer(TimeSpan.FromSeconds(5));

    var query = ofType1.Merge(ofType2);        
    var subscription = query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));    

    Console.ReadLine();
    subscription.Dispose();
}

我们看到至少间隔5秒的输出值

UPDATE: 3f0fc6f3-8a5a-476f-9661-b7330ab77877 09:14:04:725
UPDATE: fc8f0025-7a79-4329-8164-b8b421ad5865 09:14:09:817
UPDATE: ad739a71-885e-4d5b-a352-2302df0a4d87 09:14:14:925
相关文章:
  • 没有找到相关文章