IObservable TakeLast(n) and blocking
本文关键字:and blocking TakeLast IObservable | 更新日期: 2023-09-27 18:12:41
我试图获得一个可观察对象的20个最新值,并将其暴露为属性而不阻塞发生。现在,我的代码看起来像:
class Foo
{
private IObservable<int> observable;
public Foo(IObservable<int> bar)
{
this.observable = bar;
}
public IEnumerable<int> MostRecentBars
{
get
{
return this.observable.TakeLast(20).ToEnumerable();
}
}
}
然而,当MostRecentBars getter被调用时,这是阻塞的,可能是因为ToEnumerable在至少有20个观察值之前不会返回。
是否有一种内置的方法可以在不阻塞的情况下暴露最多20个可观察对象的最新值?
我给你两个选择。一个用Rx Scan
算符,但是我认为它读起来有点复杂。另一个使用带锁的标准Queue
。
(1)
class Foo
{
private int[] bars = new int[] { };
public Foo(IObservable<int> bar)
{
bar
.Scan<int, int[]>(
new int[] { },
(ns, n) =>
ns
.Concat(new [] { n, })
.TakeLast(20)
.ToArray())
.Subscribe(ns => bars = ns);
}
public IEnumerable<int> MostRecentBars
{
get
{
return bars;
}
}
}
(2)
class Foo
{
private Queue<int> queue = new Queue<int>();
public Foo(IObservable<int> bar)
{
bar.Subscribe(n =>
{
lock (queue)
{
queue.Enqueue(n);
if (queue.Count > 20)
{
queue.Dequeue();
}
}
});
}
public IEnumerable<int> MostRecentBars
{
get
{
lock (queue)
{
return queue.ToArray();
}
}
}
}
我想不出一个内置的Rx操作符可以满足您的要求。你可以这样实现它:
class Foo
{
private IObservable<int> observable;
private Queue<int> buffer = new Queue<int>();
public Foo(IObservable<int> bar)
{
this.observable = bar;
this.observable
.Subscribe(item =>
{
lock (buffer)
{
if (buffer.Count == 20) buffer.Dequeue();
buffer.Enqueue(item);
}
});
}
public IEnumerable<int> MostRecentBars
{
get
{
lock (buffer)
{
return buffer.ToList(); // Create a copy.
}
}
}
}
虽然你已经得到了你的答案,我正在考虑解决这个使用重放主题与缓冲区,并提出了类似的东西:
class Foo
{
private ReplaySubject<int> replay = new ReplaySubject<int>(20);
public Foo(IObservable<int> bar)
{
bar.Subscribe(replay);
}
public IEnumerable<int> MostRecentBars
{
get
{
var result = new List<int>();
replay.Subscribe(result.Add); //Replay fill in the list with buffered items on same thread
return result;
}
}
}
让我知道这是否符合你的问题。
我有一些扩展,我倾向于附加到我用响应式扩展构建的任何项目,其中之一是滑动窗口:
public static IObservable<IEnumerable<T>> SlidingWindow<T>(this IObservable<T> o, int length)
{
Queue<T> window = new Queue<T>();
return o.Scan<T, IEnumerable<T>>(new T[0], (a, b) =>
{
window.Enqueue(b);
if (window.Count > length)
window.Dequeue();
return window.ToArray();
});
}
返回一个包含最近N个元素的数组(如果还没有N个元素,则返回更少)。
对于您的情况,您应该能够这样做:
class Foo
{
private IObservable<int> observable;
private int[] latestWindow = new int[0];
IDisposable slidingWindowSubscription;
public Foo(IObservable<int> bar)
{
this.observable = bar;
slidingWindowSubscription = this.observable.SlidingWindow(20).Subscribe(a =>
{
latestWindow = a;
});
}
public IEnumerable<int> MostRecentBars
{
get
{
return latestWindow;
}
}
}
说真的,这一切不都取决于你愿意等多久吗?当你手头有19个项目时,你怎么知道观察对象已经完成了?一秒钟后你确定完成了吗?十年后?你确定完成了吗?你怎么知道的?它是可观察的,所以你必须一直观察它,直到你的操作符或你应用的任何一元变换对传入流做了一些有用的变换。
我认为(可能是新的添加)窗口或缓冲区与TimeSpan过载将工作。尤其是Window,它释放了一个Observable的Observable,所以一旦外部Observable被创建,你就可以监听20个item中的First,然后你需要仔细监听OnCompleted,否则你就失去了Window操作符的全部意义,但是你明白了。