如何阻塞直到事件发生在RX流中
本文关键字:RX 流中 事件 何阻塞 | 更新日期: 2023-09-27 18:18:36
与微软响应扩展(RX),我想知道是否有可能阻止,直到事件发生?
像这样:
observableStream.BlockUntilTrue(o => o.MyProperty == true);
What I have try
我已经尝试过observableStream.TakeUntil(o => o.MyProperty == true);
,但是这个立即退出。
看了你的评论后,我重新写了我的回答。在您的情况下,您可以使用First
,但它将RX的异步特性更改为阻塞的同步代码。我想这就是你的问题所在。
var firstValue = observableStream.
.Where(o => o.MyProperty)
.First();
对First
的调用将阻塞并等待第一个值从可观察序列到达,这似乎是你想要的。
这个演示代码运行良好。它添加了一个扩展方法,该方法阻塞直到流上发生单个事件。如果将超时添加到BlockingCollection的Take()中,则它将等待,直到事件发生或超时发生。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RX_2
{
public static class Program
{
static void Main(string[] args)
{
Subject<bool> stream = new Subject<bool>();
Task.Run(
() =>
{
for (int i = 0; i < 4; i++)
{
Thread.Sleep(TimeSpan.FromMilliseconds(500));
stream.OnNext(false);
}
stream.OnNext(true);
});
Console.Write("Start'n");
stream.Where(o => o == true).BlockUntilEvent();
Console.Write("Stop'n");
Console.ReadKey();
}
public static void BlockUntilEvent(this IObservable<bool> stream)
{
BlockingCollection<bool> blockingCollection = new BlockingCollection<bool>();
stream.Subscribe(blockingCollection.Add);
var result = blockingCollection.Take();
}
}
}