无法中断对流中的列表的批量更新(插入)
本文关键字:更新 插入 列表 中断 对流 | 更新日期: 2023-09-27 18:34:41
编辑:为了简化事情,这里有一个范式:我有一个项目列表,它由连续流不断更新。我时不时地会得到一个新的数据快照,用于重新初始化流。因此,如果在我想重新初始化时发生任何更新,我需要确保这些更新停止并使用新快照。
我正在处理许多需要显示到 UI 的连续更新数据流。更新需要以相反的顺序显示,即最新的更新位于列表顶部。为了在顶部显示结果,我必须插入到列表中。我遇到的问题是有时列表需要休息(即 List.Clear(,但是,如果我在中间插入,我需要停止它,因为否则插入会导致异常。
我已经整理了一种反应式方法来帮助我解决这个问题,但是,它似乎忽略了我的直到流。
public static IObservable<T> BufferAndDispatchUntil<T, TStopUnit>(
this IObservable<T> source,
Action<T> onNext,
IScheduler scheduler,
IObservable<TStopUnit> until,
DispatcherPriority dispatcherPriority = DispatcherPriority.Background)
{
if (source == null) throw new ArgumentNullException("source");
if (onNext == null) throw new ArgumentNullException("onNext");
if (Application.Current == null)
return source.Do(onNext);
var dispatcher = Application.Current.Dispatcher;
return source
.LazyBuffer(BufferTime, BufferCount, scheduler)
.TakeUntil(until)
.Do(b => dispatcher.BeginInvoke(() => b.ForEach(onNext), dispatcherPriority))
.SelectMany(i => i);
}
LazyBuffer 是 Buffer 的自定义实现,它仅在新项可用时返回结果集,而不是在指定的时间间隔内返回空结果集。这就是我调用它的方式,如上所述吹。
BufferAndDispatchUntil(p => Update.Insert(p.Item1, UpdateFactory.CreateView(p.Item2)), _config.DispatcherScheduler, _ignore);
这是我在单独线程上运行的单独代码段中的明确调用。
_ignore.OnNext(new Unit());
Update.Clear();
如果您能帮助我弄清楚,我将不胜感激。
您无法停止"中间插入"。 执行非原子操作时需要锁定数据结构,以确保数据完整性。
lock(someObj)
{
myList.Insert(i, obj);
}
确保以尽可能小的分辨率锁定,即,如果您只需要防止单个非原子操作,则不要锁定整个方法。
或者你是否看过 C# 的线程安全集合,这些集合为你处理了大部分锁定?
执行此操作
时,您将撤消 Rx 为您提供的保证:
.Do(b => dispatcher.BeginInvoke(() => b.ForEach(onNext), dispatcherPriority))
这就是您遇到错误的原因,因为您正在排队要在 UI 线程中运行的内容,但您的 Rx 管道不知道它,因此您最终会拆分。