缓冲所选具有可变超时的消息

本文关键字:超时 消息 缓冲 | 更新日期: 2023-09-27 17:58:51

我有一个包含字母(a-Z)和数字(1-9)的流。我确实想加入在超时内到达的字母(这可能会改变),并且总是立即发出数字。你能建议我哪些功能最适合这样做吗?

示例工作代码(不确定这是正确的和/或一个好的解决方案):

private BehaviorSubject<TimeSpan> sTimeouts = new BehaviorSubject<TimeSpan>(0.ms());
private IObservable<string> lettersJoined(IObservable<char> ob)
{
    return Observable.Create<string>(observer =>
    {
        var letters = new List<char>();
        var lettersFlush = new SerialDisposable();
        return ob.Subscribe(c =>
        {
            if (char.IsUpper(c))
            {
                if ((await sTimeouts.FirstAsync()).Ticks > 0)
                {
                    letters.Add(c);
                    lettersFlush.Disposable =
                        VariableTimeout(sTimeouts)
                        .Subscribe(x => {
                            observer.OnNext(String.Concat(letters));
                            letters.Clear();
                        });
                }
                else
                    observer.OnNext(letters.ToString());

            }
            else if (char.IsDigit(c))
                observer.OnNext(c.ToString());
        }
    }
}

private IObservable<long> VariableTimeout(IObservable<TimeSpan> timeouts)
{
    return Observable.Create<long>(obs =>
    {
        var sd = new SerialDisposable();
        var first = DateTime.Now;
        return timeouts
            .Subscribe(timeout =>
            {
                if (timeout.Ticks == 0 || first + timeout < DateTime.Now)
                {
                    sd.Disposable = null;
                    obs.OnNext(timeout.Ticks);
                    obs.OnCompleted();
                }
                else
                {
                    timeout -= DateTime.Now - first;
                    sd.Disposable =
                        Observable
                        .Timer(timeout)
                        .Subscribe(t => {
                            obs.OnNext(t);
                            obs.OnCompleted();
                        });
                }
            });
    });
}
private void ChangeTimeout(int timeout)
{
    sTimeouts.OnNext(timeout.ms())
}

// I use the following extension method
public static class TickExtensions
{
    public static TimeSpan ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms);
    }
}

要修改超时,我可以简单地更改私有超时变量,但如果需要/更好的话,它的Subject可能是可以的。

更新

var scheduler = new TestScheduler();
var timeout = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(0000.Ms(), 2000),
    ReactiveTest.OnNext(4300.Ms(), 1000));
var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'),
    ReactiveTest.OnNext(1600.Ms(), '2'),
    ReactiveTest.OnNext(1900.Ms(), 'A'),
    ReactiveTest.OnNext(2100.Ms(), 'B'),
    ReactiveTest.OnNext(4500.Ms(), 'C'),
    ReactiveTest.OnNext(5100.Ms(), 'A'),
    ReactiveTest.OnNext(5500.Ms(), '5'),
    ReactiveTest.OnNext(6000.Ms(), 'B'),
    ReactiveTest.OnNext(7200.Ms(), '1'),
    ReactiveTest.OnNext(7500.Ms(), 'B'),
    ReactiveTest.OnNext(7700.Ms(), 'A'),
    ReactiveTest.OnNext(8400.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"),
    ReactiveTest.OnNext(1600.Ms(), "2"),
    ReactiveTest.OnNext(4100.Ms(), "AB"),
    ReactiveTest.OnNext(5500.Ms(), "5"),
    ReactiveTest.OnNext(7000.Ms(), "CAB"),
    ReactiveTest.OnNext(7200.Ms(), "1"),
    ReactiveTest.OnNext(9400.Ms(), "BAA"));

// if ReactiveTest.OnNext(3800.Ms(), 1000)
// then expected is ReactiveTest.OnNext(3800.Ms(), "AB")

更新#2

在缓冲期间正确支持超时更改的改进解决方案

缓冲所选具有可变超时的消息

这里有几件事可能会有所帮助。

第一个大理石图很好地帮助可视化问题,但当证明某个东西是否有效时,让我们用ITestableObservable<T>实例进行规范和单元测试。

其次,我不确定你的解决方案应该是什么。如果我看一下你的大理石图,我会发现一些差异。在这里,我添加了一个时间轴来帮助可视化。

                 111111111122222222223
Time:   123456789012345678901234567890
Input:  1---2--A-B----C--A-B-1--B-A--A
Output: 1---2----AB-------CAB-1-----BAA 

在这里,我看到了第十单元发布的"AB"输出。然后我看到了第十九单元发布的"CAB"输出。此外,我看到第二十九单元公布的"BAA"输出。但你建议这些应该在持续的超时时间间隔内发生。所以我认为,也许价值观之间的差距很重要,但这似乎也不重要。这让我回到上面的观点,请提供一个可以通过或失败的单元测试。

第三,关于您的实现,您可以通过将SerialDisposable类型用于lettersFlush类型来使它稍微好一点。

为了帮助我设置单元测试,我创建了以下代码块

var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'),
    ReactiveTest.OnNext(0500.Ms(), '2'),
    ReactiveTest.OnNext(0800.Ms(), 'A'),
    ReactiveTest.OnNext(1000.Ms(), 'B'),
    ReactiveTest.OnNext(1500.Ms(), 'C'),
    ReactiveTest.OnNext(1800.Ms(), 'A'),
    ReactiveTest.OnNext(2000.Ms(), 'B'),
    ReactiveTest.OnNext(2200.Ms(), '1'),
    ReactiveTest.OnNext(2500.Ms(), 'B'),
    ReactiveTest.OnNext(2700.Ms(), 'A'),
    ReactiveTest.OnNext(3000.Ms(), 'A'));
var expected = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0100.Ms(), "1"),
    ReactiveTest.OnNext(0500.Ms(), "2"),
    ReactiveTest.OnNext(1000.Ms(), "AB"),
    ReactiveTest.OnNext(2000.Ms(), "CAB"),
    ReactiveTest.OnNext(2200.Ms(), "1"),
    ReactiveTest.OnNext(3000.Ms(), "BAA"));

我已经采取了一些自由,改变了一些价值观,我认为你所说的大理石图的意思。

如果我使用@Shlomo上面提供的非常好的答案,我可以通过使用模糊大理石图看到更多的问题。由于缓冲区边界必须发生在要包含的最后一个值发生之后,因此这些窗口需要关闭一次。

void Main()
{
    var scheduler = new TestScheduler();
    var input = scheduler.CreateColdObservable<char>(
        ReactiveTest.OnNext(0100.Ms(), '1'),
        ReactiveTest.OnNext(0500.Ms(), '2'),
        ReactiveTest.OnNext(0800.Ms(), 'A'),
        ReactiveTest.OnNext(1000.Ms(), 'B'),
        ReactiveTest.OnNext(1500.Ms(), 'C'),
        ReactiveTest.OnNext(1800.Ms(), 'A'),
        ReactiveTest.OnNext(2000.Ms(), 'B'),
        ReactiveTest.OnNext(2200.Ms(), '1'),
        ReactiveTest.OnNext(2500.Ms(), 'B'),
        ReactiveTest.OnNext(2700.Ms(), 'A'),
        ReactiveTest.OnNext(3000.Ms(), 'A'));
    var expected = scheduler.CreateColdObservable<string>(
        ReactiveTest.OnNext(0100.Ms(), "1"),
        ReactiveTest.OnNext(0500.Ms(), "2"),
        ReactiveTest.OnNext(1000.Ms()+1, "AB"),
        ReactiveTest.OnNext(2000.Ms()+1, "CAB"),
        ReactiveTest.OnNext(2200.Ms(), "1"),
        ReactiveTest.OnNext(3000.Ms()+1, "BAA"));
    /*
                     111111111122222222223
    Time:   123456789012345678901234567890
    Input:  1---2--A-B----C--A-B-1--B-A--A
    Output: 1---2----AB-------CAB-1-----BAA 
    */
    var bufferBoundaries = //Observable.Timer(TimeSpan.FromSeconds(1), scheduler);
            //Move to a hot test sequence to force the windows to close just after the values are produced
            scheduler.CreateHotObservable<Unit>(
        ReactiveTest.OnNext(1000.Ms()+1, Unit.Default),
        ReactiveTest.OnNext(2000.Ms()+1, Unit.Default),
        ReactiveTest.OnNext(3000.Ms()+1, Unit.Default),
        ReactiveTest.OnNext(4000.Ms()+1, Unit.Default));
    var publishedFinal = input
        .Publish(i => i
            .Where(c => char.IsLetter(c))
            .Buffer(bufferBoundaries)
            .Where(l => l.Any())
            .Select(lc => new string(lc.ToArray()))
            .Merge(i
                .Where(c => char.IsNumber(c))
                .Select(c => c.ToString())
            )
        );
    var observer = scheduler.CreateObserver<string>();
    publishedFinal.Subscribe(observer);
    scheduler.Start();
    //This test passes with the "+1" values hacked in.
    ReactiveAssert.AreElementsEqual(
        expected.Messages,
        observer.Messages);
}
// Define other methods and classes here
public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

我想我的观点是Rx是确定性的,因此我们可以创建确定性的测试。因此,虽然你的问题很好,我相信@Shlomo提供了一个可靠的最终答案,但我们可以做得更好,而不仅仅是模糊大理石图和在我们的例子/测试中使用Random。这里的精确性应该有助于防止生产中出现愚蠢的竞赛条件,并帮助读者更好地理解这些解决方案。

假设sampleInput作为样本输入:

var charStream = "12ABCAB1BAA".ToObservable();
var random = new Random();
var randomMilliTimings = Enumerable.Range(0, 12)
    .Select(i => random.Next(2000))
    .ToList();
var sampleInput = charStream
    .Zip(randomMilliTimings, (c, ts) => Tuple.Create(c, TimeSpan.FromMilliseconds(ts)))
    .Select(t => Observable.Return(t.Item1).Delay(t.Item2))
    .Concat();

首先,与其更改可变变量,不如生成一些流来表示缓冲区窗口:

Input:  1---2--A-B----C--A-B-1--B-A--A
Window: ---------*--------*---------*--
Output: 1---2----AB-------CAB-1-----BAA

我生成了一个递增TimeSpans的流,并将其称为bufferBoundaries,如下所示:

var bufferBoundaries = Observable.Range(1, 20)
    .Select(t => Observable.Return(t).Delay(TimeSpan.FromSeconds(t)))
    .Concat();

这看起来像这样:

Seconds: 0--1--2--3--4--5--6--7--8--9--10
BB     : ---1-----2--------3-----------4-

接下来,您需要将sampleInput拆分为单独的字母和数字流,并相应地处理它们:

var letters = sampleInput
    .Where(c => char.IsLetter(c))
    .Buffer(bufferBoundaries)
    .Where(l => l.Any())
    .Select(lc => new string(lc.ToArray()));
var numbers = sampleInput
    .Where(c => char.IsNumber(c))
    .Select(c => c.ToString());

接下来,将两个流合并在一起:

var finalOutput = letters.Merge(numbers);

最后,如果你能帮助的话,两次订阅同一个输入(在我们的例子中是sampleInput)通常不是一个好主意。因此,在我们的情况下,我们应该用以下内容替换lettersnumbersfinalOutput

var publishedFinal = sampleInput
    .Publish(_si => _si
        .Where(c => char.IsLetter(c))
        .Buffer(bufferBoundaries)
        .Where(l => l.Any())
        .Select(lc => new string(lc.ToArray()))
        .Merge( _si
            .Where(c => char.IsNumber(c))
            .Select(c => c.ToString())
        )
    );