等待Observable的第n个OnNext()

本文关键字:OnNext Observable 的第 等待 | 更新日期: 2023-09-27 18:27:51

假设我有一个可观察对象,它多次调用OnNext()。当OnNext参数满足某个标准时,我想要await。我该怎么做?

这几乎就像这篇文章中等待流程执行的例子:

public static TaskAwaiter<int> GetAwaiter(this Process process) 
{ 
    var tcs = new TaskCompletionSource<int>(); 
    process.EnableRaisingEvents = true; 
    process.Exited += (s, e) => tcs.TrySetResult(process.ExitCode); 
    if (process.HasExited) tcs.TrySetResult(process.ExitCode); 
    return tcs.Task.GetAwaiter(); 
}

让我进入代码:我的同事写了一个总线实现,简化如下:

public class MyBus : IObservable<MessageRecieved>
{
    public void Post(IBusMessage message)
    {
        foreach (var observer in this.observers)
        {
            observer.OnNext(new MessageRecieved(message.GetType(), MessageRecieved.MessageStage.Recieved));
        }
        message.Handle();
        foreach (var observer in this.observers)
        {
            observer.OnNext(new MessageRecieved(message.GetType(), MessageRecieved.MessageStage.Handled));
        }
    }
    // ... some more stuff
}

发生的情况是,发布了一条消息,它对消息做了一些事情,然后在公交车上发布了一个新消息。因此,一系列的信息被张贴在公交车上,然后被处理。实际的实现有点不同,使用了消息处理程序之类的东西,但这与问题无关。

问题是:我们需要知道它什么时候结束。我知道最后一条消息的消息类型(比如LastMessageType)。因此,我的想法是让该总线实现IObservable,并创建一个Observer,它收集事件,并在捕获到类型为LastMessageType的消息时引发事件。

然而,我看到Bart de Smet等待一系列随机事件(windows形成事件),并为此创建了一个awaiter。所以我想做的是:

var initialMessage = ConstructInitialMessage();
var bus = new MessageBus();
await bus.Process(initialMessage);

有人能为我指明一个方向,在哪里可以找到解决方案吗。我已经实现了一个观察器,它在OnNext捕捉到LastMessageType消息的情况下引发一个事件,但它是不可用的。像这样:

public static TaskAwaiter<int> ProcessMessage(this MyBus bus, BeginMessage message)
{
    var tcs = new TaskCompletionSource<int>();
    var observcer = new MessageObserver();
    observcer.LastMessageRecieved += () => tcs.SetResult(1);
    return tcs.Task.GetAwaiter();
}

等待Observable的第n个OnNext()

所以我想做的是:

await bus.Process(initialMessage);

可以使用Rx(在您从项目中引用它之后,添加using System.Reactive.Linq;)来执行此操作。这样做的目的是等待可观察对象完成(在其观察对象上调用OnCompleted)。

如果由于某种原因,您无法修复可观察到的内容以正确完成,则必须创建一个新的可观察到内容,该内容应在原始内容结束时结束。为此,可以将FirstAsync()与谓词一起使用:

await bus.Process(initialMessage).FirstAsync(
    message => message.Type == finalType 
        && message.Stage == MessageRecieved.MessageStage.Handled);

我已经实现了一个观察者[…],但这并不是徒劳的。

该代码的问题在于,您的代码返回TaskAwaiter<int>TaskAwaiter是不可用的,Task本身是可用的。因此,如果您将代码更改为仅return tcs.Task;并更改返回类型,它应该可以工作。