反应式扩展:如何观察 IEnumerable 方法结果异步

本文关键字:IEnumerable 方法 结果 异步 观察 扩展 何观察 反应式 | 更新日期: 2023-09-27 18:37:00

我有一个返回我的业务对象的 IEnumerable 的方法。在此方法中,我将大型文本文件的内容解析为业务对象模型。里面没有线程的东西。

在我的视图模型 (WPF) 中,我需要存储和显示方法的结果。存储是一个可观察集合。

这里是可观察的代码:

private void OpenFile(string file)
{
    _parser = new IhvParser();
    App.Messenger.NotifyColleagues(Actions.ReportContentInfo, new Model.StatusInfoDisplayDTO { Information = "Lade Daten...", Interval = 0 });
    _ihvDataList.Clear();
    var obs = _parser.ParseDataObservable(file)
                     .ToObservable(NewThreadScheduler.Default)
                     .ObserveOnDispatcher()
                     .Subscribe<Ihv>(AddIhvToList, ReportError, ReportComplete);
}
private void ReportComplete()
{
    App.Messenger.NotifyColleagues(Actions.ReportContentInfo, new Model.StatusInfoDisplayDTO { Information = "Daten fertig geladen.", Interval = 3000 });
    RaisePropertyChanged(() => IhvDataList);
}
private void ReportError(Exception ex)
{
    MessageBox.Show("...");
}
private void AddIhvToList(Ihv ihv)
{
    _ihvDataList.Add(ihv);
}

这是解析器代码:

public IEnumerable<Model.Ihv> ParseDataObservable(string file)
{
    using (StreamReader reader = new StreamReader(file))
    {
        var head = reader.ReadLine(); //erste Zeile ist Kopfinformation
        if (!head.Contains("BayBAS") || !head.Contains("2.3.0"))
        {
            _logger.ErrorFormat("Die Datei {0} liegt nicht im BayBAS-Format 2.3.0 vor.");
        }
        else
        {
            while (!reader.EndOfStream)
            {
                var line = reader.ReadLine();
                if (line.Length != 1415)
                {
                    _logger.ErrorFormat("Die Datei {0} liegt nicht im BayBAS-Format 2.3.0 vor.");
                    break;
                }
                var tempIhvItem = Model.Ihv.Parse(line);
                yield return tempIhvItem;
            }
            reader.Close();
        }
    }
}

为什么我没有异步获取结果?在 DataGrid 中看到结果之前,将解析并传递所有项目。

有人可以帮忙吗?

安德烈亚斯

反应式扩展:如何观察 IEnumerable 方法结果异步

你确定这不是异步发生的吗? 您是根据您在 UI 中感知到的内容假设这一点,还是设置了断点并确定事实确实如此?

请注意,WPF 的Dispatcher使用优先级队列,DispatcherSchedulerNormal优先级计划项,优先级高于用于输入、布局和呈现的优先级。 如果结果来得足够快,则 UI 可能要等到处理完最后一个结果后才会更新:调度程序可能忙于处理结果,无法执行 UI 的布局和呈现。

您可以尝试重写DispatcherScheduler的行为以自定义优先级进行计划,如下所示:

public class PriorityDispatcherScheduler : DispatcherScheduler
{
    private readonly DispatcherPriority _priority;
    public PriorityDispatcherScheduler(DispatcherPriority priority)
        : this(priority, Dispatcher.CurrentDispatcher) {}
    public PriorityDispatcherScheduler(DispatcherPriority priority, Dispatcher dispatcher)
        : base(dispatcher)
    {
        _priority = priority;
    }
    public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
    {
        if (action == null)
            throw new ArgumentNullException("action");
        var d = new SingleAssignmentDisposable();
        this.Dispatcher.BeginInvoke(
            _priority,
            (Action)(() =>
                     {
                         if (d.IsDisposed)
                             return;
                         d.Disposable = action(this, state);
                     }));
        return d;
    }
}

然后通过将ObserveOnDispatcher()替换为ObserveOn(new PriorityDispatcherScheduler(p))来修改可观察序列,其中p是适当的优先级(例如,Background )。

此外,这看起来非常可疑:ToObservable(NewThreadScheduler.Default) . 我相信这将导致每次结果出现时创建一个新线程,其唯一目的是将其传递给调度程序,之后新线程将终止。 这几乎肯定不是你的意图。 我假设您只是想在单独的线程上处理文件;正如所写的那样,如果您的IEnumerable产生 1,000 个项目,您的代码实际上最终会创建 1,000 个短期线程,而这些项目实际上都不会执行读取文件的工作。

最后,是否在调度程序线程上调用OpenFile()? 如果是这样,我相信将要发生的事情如下:

  1. 调度程序(在UI线程上)将调用Subscribe(),它将处理可观察运算符的链一直回到ParseDataObservable(file)
  2. 调度程序将循环访问您的IEnumerable序列,将每个结果触发到ToObservable()创建的可观察序列中。
  3. 传递
  4. 到可观察序列的每个结果都将被安排在调度程序(当前正在运行的同一调度程序)上传递。

如果是这种情况,那么在任何结果传递给AddIhvToList()之前,将读取整个文件,因为调度程序被束缚在读取文件上,并且在完成之前不会处理其队列中的结果。 如果发生这种情况,您可以尝试更改代码,如下所示:

var obs = _parser.ParseDataObservable(file)
                 .ToObservable()
                 .SubscribeOn(/*NewThread*/Scheduler.Default)
                 .ObserveOnDispatcher() // consider using PriorityDispatcherScheduler
                 .Subscribe<Ihv>(AddIhvToList, ReportError, ReportComplete);

注入SubscribeOn()应确保IEnumerable的迭代(即文件的读取)发生在单独的线程上。 Scheduler.Default在这里应该足够了,但如果你真的需要,你可以使用NewThreadScheduler(你可能不需要)。 在设置完所有内容后,调度程序线程将从Subscribe()返回,释放它以继续处理其队列,即在结果进入时将结果传递给AddIhvToList()。 这应该为您提供所需的异步行为。

相关文章: