如何根据事件中的条件来完成一个Rx Observable

本文关键字:一个 Observable Rx 事件 何根 条件 | 更新日期: 2023-09-27 18:14:40

我有一个我无法控制的事件,它为我提供了数据。eventArgs看起来像这样:

class MyEventArg {
  bool IsLastItem {get;}
  Data DataItem {get;}
}

我使用Rx将这个事件转换为IObservable。但是如果IsLastItem为真,我想完成这个可观察对象。

有什么好主意吗?一种方法是将数据通过一个主题,我有更多的控制设置OnComplete事件,如果条件发生…

如何根据事件中的条件来完成一个Rx Observable

如果您希望包含最后一个元素,您可以将仅包含最后一个元素的流与包含TakeWhile的常规流合并在一起。下面是一个简单的控制台应用程序来证明这一点:

var subject = new List<string>
{                            
"test",
"last"
}.ToObservable();
var my = subject
            .Where(x => x == "last").Take(1)
            .Merge(subject.TakeWhile(x => x != "last"));
my.Subscribe(
    o => Console.WriteLine("On Next: " + o), 
    () => Console.WriteLine("Completed"));
Console.ReadLine();
这个打印

:

On Next: test
On Next: last
Completed

有一个bug,如果底层的可观察对象实际上没有完成,就会抑制OnCompleted消息。我更正了代码,以确保OnCompleted被称为

如果你想避免为冷可观察对象多次订阅底层序列,你可以这样重构代码:

var my = subject.Publish(p => p
            .Where(x => x == "last").Take(1)
            .Merge(p.TakeWhile(x => x != "last")));
public static IObservable<TSource> TakeWhileInclusive<TSource>(
        this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    return Observable
        .Create<TSource>(o => source.Subscribe(x =>
                                                   {
                                                       o.OnNext(x);
                                                       if (!predicate(x))
                                                           o.OnCompleted();
                                                   },
                                               o.OnError,
                                               o.OnCompleted
                                  ));
}

你在找这样的东西吗?

IObservable<MyEventArg> result =
    myEventArgObservable.TakeWhile(arg => !arg.IsLastItem);