如何创建可重复的 IObservable 资源实例

本文关键字:IObservable 资源 实例 何创建 创建 | 更新日期: 2023-09-27 18:35:39

需要一些关于RX的帮助。我想定义可观察的,它应该在创建第一个订阅时创建资源,为每个新订阅发布一次此资源实例,并且当所有订阅完成后,必须释放该资源实例。类似于 Observable.Using,但具有 Publish(value) 和 RefCount 行为。我所有使用标准运算符表达它的尝试都失败了。这段代码可以做我想做的,但我认为必须有标准的方法来实现。我真的不想重新发明轮子。

using System;
using System.Reactive.Linq;
using System.Reactive.Disposables;
namespace ConsoleApplication1
{
    class Program
    {
        static void Main()
        {
            // this part is what i can't express in standart RX operators..
            Res res = null;
            RefCountDisposable disp = null;
            var @using = Observable.Create<Res>(obs =>
                {
                    res = res ?? new Res();
                    disp = disp == null || disp.IsDisposed ? new RefCountDisposable(res) : disp;
                    obs.OnNext(res);
                    return new CompositeDisposable(disp.GetDisposable(), disp, Disposable.Create(() => res = null));
                });
            // end
            var sub1 = @using.Subscribe(Print);
            var sub2 = @using.Subscribe(Print);
            sub1.Dispose();
            sub2.Dispose();
            sub1 = @using.Subscribe(Print);
            sub2 = @using.Subscribe(Print);
            sub1.Dispose();
            sub2.Dispose();
            Console.ReadKey();
        }
        static void Print(object o)
        {
            Console.WriteLine(o.GetHashCode());
        }
    }
    class Res : IDisposable
    {
        public Res()
        {
            Console.WriteLine("CREATED");
        }
        public void Dispose()
        {
            Console.WriteLine("DISPOSED");
        }
    }
}

输出:

CREATED
1111
1111
DISPOSED
CREATED
2222
2222
DISPOSED

我对标准运算符的"最佳"尝试:

var @using = Observable.Using(() => new Res(), res => Observable.Never(res).StartWith(res))
     .Replay(1)
     .RefCount();

输出为:

CREATED
1111
1111
DISPOSED
CREATED
1111  <-- this is "wrong" value
2222
2222
DISPOSED

谢谢!

对不起,我的英语不好=(

如何创建可重复的 IObservable 资源实例

在有点头疼之后,我终于意识到Using.Replay.RefCount的问题在于Replay内部使用单个ReplaySubject实例调用Multicast,但在我的特定情况下,我需要Replay在每个新的首次订阅上重新创建主题。通过谷歌,我找到了RXX库,它ReconnectableObservable就是答案。它使用主题工厂而不是主题实例在每次Connect调用中重新创建主题(原始 rxx 代码,根本没有合约):

internal sealed class ReconnectableObservable<TSource, TResult> : IConnectableObservable<TResult>
{
    private ISubject<TSource, TResult> Subject
    {
        get { return _subject ?? (_subject = _factory()); }
    }
    private readonly object _gate = new object();
    private readonly IObservable<TSource> _source;
    private readonly Func<ISubject<TSource, TResult>> _factory;
    private ISubject<TSource, TResult> _subject;
    private IDisposable _subscription;
    public ReconnectableObservable(IObservable<TSource> source, Func<ISubject<TSource, TResult>> factory)
    {
        _source = source;
        _factory = factory;
    }
    public IDisposable Connect()
    {
        lock (_gate)
        {
            if (_subscription != null)
                return _subscription;
            _subscription = new CompositeDisposable(
                _source.Subscribe(Subject),
                Disposable.Create(() =>
                {
                    lock (_gate)
                    {
                        _subscription = null;
                        _subject = null;
                    }
                }));
            return _subscription;
        }
    }
    public IDisposable Subscribe(IObserver<TResult> observer)
    {
        lock (_gate)
        {
            return Subject.Subscribe(observer);
        }
    }
}

和一些扩展方法:

public static class Ext
{
    public static IConnectableObservable<T> Multicast<T>(this IObservable<T> obs, Func<ISubject<T>> subjectFactory)
    {
        return new ReconnectableObservable<T, T>(obs, subjectFactory);
    }
    public static IConnectableObservable<T> ReplayReconnect<T>(this IObservable<T> obs, int replayCount)
    {
        return obs.Multicast(() => new ReplaySubject<T>(replayCount));
    }
    public static IConnectableObservable<T> PublishReconnect<T>(this IObservable<T> obs)
    {
        return obs.Multicast(() => new Subject<T>());
    }
}

使用该代码,现在我可以这样做:

var @using = Observable
             .Using(() => new Res(), _ => Observable.Never(_).StartWith(_))
             .ReplayReconnect(1) // <-- that's it!
             .RefCount();

雅虎!它按预期工作。

感谢所有回答的人!你把我推向了正确的方向。

试试这个:

var @using = Observable.Using(
        () => new Res(),
        res => Observable.Return(res).Concat(Observable.Never<Res>()))
    .Publish((Res)null)
    .RefCount()
    .SkipWhile(res => res == null);

Concat可防止观察者在可观察量产生其唯一值时自动取消订阅。

如果有办法使用标准运算符来做到这一点,我看不到它。

问题在于,标准运算符没有"当有订阅者时才缓存值"选项。

无论订阅者如何,重播运算符都将缓存最后一个值,这是您看到的"错误"值的根本原因。

它强调了这样一个事实,即使用 + 重播是一个危险的组合,因为它发出了一个释放的值。

我怀疑,如果有人确实使用标准运算符管理了一些魔法,它就不会像您的 Observable.Create 实现那样可读。

我已经多次使用Observable.Create来创建代码,我确信这些代码比使用标准运算符的等效构造更简洁,可读和可维护。

我的建议是,使用 Observable.Create 绝对没有错 - 将代码包装在一个接受资源的漂亮工厂方法中,您就可以开始了。这是我这样做的尝试,它只是添加了线程安全性的代码的重新打包:

public static IObservable<T> CreateObservableRefCountedResource<T>(Func<T> resourceFactory)
    where T : class, IDisposable
{
    T resource = null;
    RefCountDisposable resourceDisposable = null;
    var gate = new object();
    return Observable.Create<T>(o =>
        {
            lock (gate)
            {
                resource = resource ?? resourceFactory();
                var disposeAction = Disposable.Create(() =>
                    {
                        lock (gate)
                        {
                            resource.Dispose();
                            resource = null;
                        }
                    });
                resourceDisposable = (resourceDisposable == null || resourceDisposable.IsDisposed)
                                         ? new RefCountDisposable(disposeAction)
                                         : resourceDisposable;
                o.OnNext(resource);
                return new CompositeDisposable(
                    resourceDisposable,
                    resourceDisposable.GetDisposable());
            }
        });
}

已编辑 - 忘记调用资源Disposable.GetDisposable()!