等待Observable的第n个OnNext()
本文关键字:OnNext Observable 的第 等待 | 更新日期: 2023-09-27 18:27:51
假设我有一个可观察对象,它多次调用OnNext()
。当OnNext
参数满足某个标准时,我想要await
。我该怎么做?
这几乎就像这篇文章中等待流程执行的例子:
public static TaskAwaiter<int> GetAwaiter(this Process process)
{
var tcs = new TaskCompletionSource<int>();
process.EnableRaisingEvents = true;
process.Exited += (s, e) => tcs.TrySetResult(process.ExitCode);
if (process.HasExited) tcs.TrySetResult(process.ExitCode);
return tcs.Task.GetAwaiter();
}
让我进入代码:我的同事写了一个总线实现,简化如下:
public class MyBus : IObservable<MessageRecieved>
{
public void Post(IBusMessage message)
{
foreach (var observer in this.observers)
{
observer.OnNext(new MessageRecieved(message.GetType(), MessageRecieved.MessageStage.Recieved));
}
message.Handle();
foreach (var observer in this.observers)
{
observer.OnNext(new MessageRecieved(message.GetType(), MessageRecieved.MessageStage.Handled));
}
}
// ... some more stuff
}
发生的情况是,发布了一条消息,它对消息做了一些事情,然后在公交车上发布了一个新消息。因此,一系列的信息被张贴在公交车上,然后被处理。实际的实现有点不同,使用了消息处理程序之类的东西,但这与问题无关。
问题是:我们需要知道它什么时候结束。我知道最后一条消息的消息类型(比如LastMessageType
)。因此,我的想法是让该总线实现IObservable
,并创建一个Observer
,它收集事件,并在捕获到类型为LastMessageType
的消息时引发事件。
然而,我看到Bart de Smet等待一系列随机事件(windows形成事件),并为此创建了一个awaiter。所以我想做的是:
var initialMessage = ConstructInitialMessage();
var bus = new MessageBus();
await bus.Process(initialMessage);
有人能为我指明一个方向,在哪里可以找到解决方案吗。我已经实现了一个观察器,它在OnNext捕捉到LastMessageType
消息的情况下引发一个事件,但它是不可用的。像这样:
public static TaskAwaiter<int> ProcessMessage(this MyBus bus, BeginMessage message)
{
var tcs = new TaskCompletionSource<int>();
var observcer = new MessageObserver();
observcer.LastMessageRecieved += () => tcs.SetResult(1);
return tcs.Task.GetAwaiter();
}
所以我想做的是:
await bus.Process(initialMessage);
您可以使用Rx(在您从项目中引用它之后,添加using System.Reactive.Linq;
)来执行此操作。这样做的目的是等待可观察对象完成(在其观察对象上调用OnCompleted
)。
如果由于某种原因,您无法修复可观察到的内容以正确完成,则必须创建一个新的可观察到内容,该内容应在原始内容结束时结束。为此,可以将FirstAsync()
与谓词一起使用:
await bus.Process(initialMessage).FirstAsync(
message => message.Type == finalType
&& message.Stage == MessageRecieved.MessageStage.Handled);
我已经实现了一个观察者[…],但这并不是徒劳的。
该代码的问题在于,您的代码返回TaskAwaiter<int>
。TaskAwaiter
是不可用的,Task
本身是可用的。因此,如果您将代码更改为仅return tcs.Task;
并更改返回类型,它应该可以工作。