如何修改IObservable我收集角色,直到有一段时间没有角色
本文关键字:角色 一段时间 何修改 修改 char IObservable | 更新日期: 2023-09-27 17:49:44
我想写一个Rx查询,它接受IObvservable<char>
并产生IObservable<string>
。字符串应该被缓冲,直到在指定的时间内没有产生任何字符。
数据源是一个串行端口,我从中捕获了DataReceived
事件,并从中产生了IObservable<char>
。我所处理的协议基本上是基于字符的,但它在实现上不是很一致,所以我需要以各种不同的方式观察字符流。在某些情况下,有一个响应结束结束符(但不是换行符),在一种情况下,我得到一个未知长度的字符串,我知道它已经到达的唯一方法是在几百毫秒内没有其他东西到达。这就是我想解决的问题。
我发现
var result = source.Buffer(TimeSpan.FromMilliseconds(200))
.Select(s=>new string(s.ToArray()));
Buffer(TimeSpan)
几乎是我需要的,但不完全是。我需要计时器在每次新字符到达时重置,以便仅在距离最后一个字符已经过了足够的时间时才产生缓冲区。
请问,谁能提供一个建议,如何实现这一点?
(更新)当我在等待答案的时候,我想到了一个我自己的解决方案,基本上是重新发明了油门:
public virtual IObservable<string> BufferUntilQuiescentFor(IObservable<char> source, TimeSpan quietTime)
{
var shared = source.Publish().RefCount();
var timer = new Timer(quietTime.TotalMilliseconds);
var bufferCloser = new Subject<Unit>();
// Hook up the timer's Elapsed event so that it notifies the bufferCloser sequence
timer.Elapsed += (sender, args) =>
{
timer.Stop();
bufferCloser.OnNext(Unit.Default); // close the buffer
};
// Whenever the shared source sequence produces a value, reset the timer, which will prevent the buffer from closing.
shared.Subscribe(value =>
{
timer.Stop();
timer.Start();
});
// Finally, return the buffered sequence projected into IObservable<string>
var sequence = shared.Buffer(() => bufferCloser).Select(s=>new string(s.ToArray()));
return sequence;
}
我没有正确理解Throttle
,我认为它的行为与实际情况不同-现在我已经用"大理石图"向我解释了它,我正确地理解了它,我相信它实际上是一个更优雅的解决方案,我想出了(我还没有测试我的代码,要么)。这是一个有趣的练习;-)
这一切都归功于Enigmativity——我只是在这里重复它,以配合我添加的解释。
var dueTime = TimeSpan.FromMilliseconds(200);
var result = source
.Publish(o => o.Buffer(() => o.Throttle(dueTime)))
.Select(cs => new string(cs.ToArray()));
它的工作方式如下图所示(其中dueTime
对应三段时间):
source: -----h--el--l--o----wo-r--l-d---|
throttled: ------------------o------------d|
buffer[0]: -----h--el--l--o--|
buffer[1]: -wo-r--l-d--|
result: ------------------"hello"------"world"
使用Publish
只是为了确保Buffer
和Throttle
共享对底层source
的单个订阅。Throttle
的文档:
忽略可观察序列中的值,这些值在到期时间之前后跟另一个值…
正在使用的Buffer
的过载接受一系列"缓冲区关闭"。每当序列发出一个值时,当前缓冲区结束,下一个缓冲区开始。
这是你需要的吗?
var result =
source
.Publish(hot =>
hot.Buffer(() =>
hot.Throttle(TimeSpan.FromMilliseconds(200))))
.Select(s => new string(s.ToArray()));