订阅/重新订阅可观察流

本文关键字:观察 新订阅 订阅 | 更新日期: 2023-09-27 18:33:00

>我遇到了一个问题,我想在谓词为真时订阅可观察的流,并在谓词为假时停止订阅。当将来某个时间点的谓词再次为 true 时,它应该重新订阅可观察流。

用例:

我将我的可观察流作为输入(IObservable<IList<LogEntity>> items(,如果我无法将日志实体插入数据库,它应该取消订阅流,当数据库备份运行时,它应该自动订阅流(基于属性IsSubscribed(并开始插入数据。

我自己的尝试:

我已经尝试了以下不起作用的方法:

var groups = from dataItem in items.SelectMany(o => o.GroupBy(i => i.EntityType))
    where dataItem.Any()
    select new {Type = dataItem.Key, List = dataItem.Select(o => o)};
groups
    .TakeWhile(o => IsSubscribed)
    .SubscribeOn(_scheduler)
    .Repeat()
    .Subscribe(o => Insert(o.Type, o.List));

根据属性IsSubscribed,我想流式传输订阅和取消订阅。当TakeWhile为真时,OnCompleted会被调用,当Subscribe之后不起作用时。旁注:这是一个冷的可观测流

问题:

如何创建一个可观察的流,我可以在其中根据需要多次订阅和取消订阅(有点像 C# 中的事件处理程序(

提前感谢您的帮助

订阅/重新订阅可观察流

看起来像一个重复的问题。

但是,从冷 IObservable 上的暂停和恢复订阅中提取代码,可以将其调整为

var subscription = Observable.Create<IObservable<YourType>>(o =>
{
    var current = groups.Replay();
    var connection = new SerialDisposable();
    connection.Disposable = current.Connect();
    return IsSubscribed
        .DistinctUntilChanged()
        .Select(isRunning =>
        {
            if (isRunning)
            {
                //Return the current replayed values.
                return current;
            }
            else
            {
                //Disconnect and replace current.
                current = source.Replay();
                connection.Disposable = current.Connect();
                //yield silence until the next time we resume.
                return Observable.Never<YourType>();
            }
        })
        .Subscribe(o);
})
.Switch()
.Subscribe(o => Insert(o.Type, o.List));

你可以看到马特·巴雷特(和我(在这里谈论它。我建议观看整个视频(可能以 2 倍的速度(以获得完整的上下文。

你想要的是添加 组 .延迟(群.SelectMany(WaitForDatabaseUp((

public async Task WaitForDatabaseUp()
{
    //If IsSubscribed continue execution
    if(IsSubscribed) return;
    //Else wait until IsSubscribed == true
    await this.ObservableForProperty(x => x.IsSubscribed, skipInitial: false)
                       .Value()
                       .Where(isSubscribed => isSubscribed)
                       .Take(1);
}

使用您喜欢的框架将 INPC 转换为可观察量,您可以在其中看到 ObserveProperty((

基本上,我们内联了一个仅在IsSubscribed == true时返回的任务,然后将该任务转换为可观察量,以便与Rx兼容。