如何阻塞直到事件发生在RX流中

本文关键字:RX 流中 事件 何阻塞 | 更新日期: 2023-09-27 18:18:36

与微软响应扩展(RX),我想知道是否有可能阻止,直到事件发生?

像这样:

observableStream.BlockUntilTrue(o => o.MyProperty == true);

What I have try

我已经尝试过observableStream.TakeUntil(o => o.MyProperty == true);,但是这个立即退出。

如何阻塞直到事件发生在RX流中

看了你的评论后,我重新写了我的回答。在您的情况下,您可以使用First,但它将RX的异步特性更改为阻塞的同步代码。我想这就是你的问题所在。

var firstValue = observableStream.
  .Where(o => o.MyProperty)
  .First();

First的调用将阻塞并等待第一个值从可观察序列到达,这似乎是你想要的。

这个演示代码运行良好。它添加了一个扩展方法,该方法阻塞直到流上发生单个事件。如果将超时添加到BlockingCollection的Take()中,则它将等待,直到事件发生或超时发生。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RX_2
{
    public static class Program
    {
        static void Main(string[] args)
        {
            Subject<bool> stream = new Subject<bool>();
            Task.Run(
                () =>
                {
                    for (int i = 0; i < 4; i++)
                    {
                        Thread.Sleep(TimeSpan.FromMilliseconds(500));
                        stream.OnNext(false);
                    }
                    stream.OnNext(true);
                });
            Console.Write("Start'n");
            stream.Where(o => o == true).BlockUntilEvent();
            Console.Write("Stop'n");
            Console.ReadKey();
        }
        public static void BlockUntilEvent(this IObservable<bool> stream)
        {
            BlockingCollection<bool> blockingCollection = new BlockingCollection<bool>();
            stream.Subscribe(blockingCollection.Add);
            var result = blockingCollection.Take();
        }
    }
}