如何修改IObservable我收集角色,直到有一段时间没有角色

本文关键字:角色 一段时间 何修改 修改 char IObservable | 更新日期: 2023-09-27 17:49:44

我想写一个Rx查询,它接受IObvservable<char>并产生IObservable<string>。字符串应该被缓冲,直到在指定的时间内没有产生任何字符。

数据源是一个串行端口,我从中捕获了DataReceived事件,并从中产生了IObservable<char>。我所处理的协议基本上是基于字符的,但它在实现上不是很一致,所以我需要以各种不同的方式观察字符流。在某些情况下,有一个响应结束结束符(但不是换行符),在一种情况下,我得到一个未知长度的字符串,我知道它已经到达的唯一方法是在几百毫秒内没有其他东西到达。这就是我想解决的问题。

我发现

var result = source.Buffer(TimeSpan.FromMilliseconds(200))
                    .Select(s=>new string(s.ToArray()));

Buffer(TimeSpan)几乎是我需要的,但不完全是。我需要计时器在每次新字符到达时重置,以便仅在距离最后一个字符已经过了足够的时间时才产生缓冲区。

请问,谁能提供一个建议,如何实现这一点?

(更新)当我在等待答案的时候,我想到了一个我自己的解决方案,基本上是重新发明了油门:

    public virtual IObservable<string> BufferUntilQuiescentFor(IObservable<char> source, TimeSpan quietTime)
    {
        var shared = source.Publish().RefCount();
        var timer = new Timer(quietTime.TotalMilliseconds);
        var bufferCloser = new Subject<Unit>();
        // Hook up the timer's Elapsed event so that it notifies the bufferCloser sequence
        timer.Elapsed += (sender, args) =>
        {
            timer.Stop();
            bufferCloser.OnNext(Unit.Default);  // close the buffer
        };
        // Whenever the shared source sequence produces a value, reset the timer, which will prevent the buffer from closing.
        shared.Subscribe(value =>
        {
            timer.Stop();
            timer.Start();
        });
        // Finally, return the buffered sequence projected into IObservable<string>
        var sequence = shared.Buffer(() => bufferCloser).Select(s=>new string(s.ToArray()));
        return sequence;
    }

我没有正确理解Throttle,我认为它的行为与实际情况不同-现在我已经用"大理石图"向我解释了它,我正确地理解了它,我相信它实际上是一个更优雅的解决方案,我想出了(我还没有测试我的代码,要么)。这是一个有趣的练习;-)

如何修改IObservable<char>我收集角色,直到有一段时间没有角色

这一切都归功于Enigmativity——我只是在这里重复它,以配合我添加的解释。

var dueTime = TimeSpan.FromMilliseconds(200);
var result = source
    .Publish(o => o.Buffer(() => o.Throttle(dueTime)))
    .Select(cs => new string(cs.ToArray()));

它的工作方式如下图所示(其中dueTime对应三段时间):

source:    -----h--el--l--o----wo-r--l-d---|
throttled: ------------------o------------d|
buffer[0]: -----h--el--l--o--|
buffer[1]:                    -wo-r--l-d--|
result:    ------------------"hello"------"world"

使用Publish只是为了确保BufferThrottle共享对底层source的单个订阅。Throttle的文档:

忽略可观察序列中的值,这些值在到期时间之前后跟另一个值…

正在使用的Buffer的过载接受一系列"缓冲区关闭"。每当序列发出一个值时,当前缓冲区结束,下一个缓冲区开始。

这是你需要的吗?

var result =
    source
        .Publish(hot =>
            hot.Buffer(() =>
                hot.Throttle(TimeSpan.FromMilliseconds(200))))
       .Select(s => new string(s.ToArray()));