无法中断对流中的列表的批量更新(插入)

本文关键字:更新 插入 列表 中断 对流 | 更新日期: 2023-09-27 18:34:41

编辑:为了简化事情,这里有一个范式:我有一个项目列表,它由连续流不断更新。我时不时地会得到一个新的数据快照,用于重新初始化流。因此,如果在我想重新初始化时发生任何更新,我需要确保这些更新停止并使用新快照。

我正在处理许多需要显示到 UI 的连续更新数据流。更新需要以相反的顺序显示,即最新的更新位于列表顶部。为了在顶部显示结果,我必须插入到列表中。我遇到的问题是有时列表需要休息(即 List.Clear(,但是,如果我在中间插入,我需要停止它,因为否则插入会导致异常。

我已经整理了一种反应式方法来帮助我解决这个问题,但是,它似乎忽略了我的直到流。

public static IObservable<T> BufferAndDispatchUntil<T, TStopUnit>(
                             this IObservable<T> source, 
                             Action<T> onNext, 
                             IScheduler scheduler, 
                             IObservable<TStopUnit> until, 
                             DispatcherPriority dispatcherPriority = DispatcherPriority.Background)
{
        if (source == null) throw new ArgumentNullException("source");
        if (onNext == null) throw new ArgumentNullException("onNext");
        if (Application.Current == null)
            return source.Do(onNext);
        var dispatcher = Application.Current.Dispatcher;
        return source
            .LazyBuffer(BufferTime, BufferCount, scheduler)
            .TakeUntil(until)
            .Do(b => dispatcher.BeginInvoke(() => b.ForEach(onNext), dispatcherPriority))
            .SelectMany(i => i);
}

LazyBuffer 是 Buffer 的自定义实现,它仅在新项可用时返回结果集,而不是在指定的时间间隔内返回空结果集。这就是我调用它的方式,如上所述吹。

BufferAndDispatchUntil(p => Update.Insert(p.Item1, UpdateFactory.CreateView(p.Item2)), _config.DispatcherScheduler, _ignore);

这是我在单独线程上运行的单独代码段中的明确调用。

_ignore.OnNext(new Unit());  
Update.Clear();

如果您能帮助我弄清楚,我将不胜感激。

无法中断对流中的列表的批量更新(插入)

您无法停止"中间插入"。 执行非原子操作时需要锁定数据结构,以确保数据完整性。

lock(someObj)
{
    myList.Insert(i, obj);
}

确保以尽可能小的分辨率锁定,即,如果您只需要防止单个非原子操作,则不要锁定整个方法。

或者你是否看过 C# 的线程安全集合,这些集合为你处理了大部分锁定?

执行此操作

时,您将撤消 Rx 为您提供的保证:

.Do(b => dispatcher.BeginInvoke(() => b.ForEach(onNext), dispatcherPriority))

这就是您遇到错误的原因,因为您正在排队要在 UI 线程中运行的内容,但您的 Rx 管道不知道它,因此您最终会拆分。