使用响应式框架观察线程安全集合

本文关键字:线程 安全 集合 观察 框架 响应 | 更新日期: 2023-09-27 18:33:44

这将归结为一个相当高级的问题。我有一个大型数据集,需要(独立(验证每一行。我想使用 Parallel.Foreach,并在该行上调用验证方法。假定此方法是线程安全的。如果验证返回错误,我需要使用此错误更新数据行。我显然无法从后台线程执行此操作。但是,我不确定实现此错误处理的最佳方法是什么。我实现这一点的想法是将行 ID 和错误存储在 BlockingCollection 中,因为这对于写入是线程安全的。然后我将不断轮询(从后台线程(,当后台线程找到数据时,调用表单并更新当前行。

我想知道是否有更简单的方法可以做到这一点,使用反应式框架?基本上,我需要一个多生产者线程安全集合,可以"观察"该集合,当将新值添加到集合中时,"OnNext"将在主线程上执行 - 这可能吗?理想情况下,我还可以控制这种情况发生的频率(例如每 2-3 秒一次,这样每 2-3 秒主线程上的回调就会更新多行(,所以我不会经常调用主线程。

谢谢你的时间。

使用响应式框架观察线程安全集合

这如何打动你的幻想:

IObservable<bool> ValidateAsync(Row item)
{
    return Observable.Start(() => {
        // TODO: Figure out if the row is valid
        return true;
    }, Scheduler.TaskPoolScheduler);
}
myBigDataTable.ToObservable()
    .Select(x => ValidateAsync(x).Select(y => new { Row = x, IsValid = y }))
    .Merge(10 /* rows concurrently */)
    .ObserveOn(SynchronizationContext.Current /*assuming WinForms */)
    .Subscribe(x => {
        Console.WriteLine("Row {0} validity: {1}", x.Row, x.IsValid);
    });
没有锁,没有

愚蠢的容器,没有阻塞,100%线程安全。