如何创建可重复的 IObservable 资源实例
本文关键字:IObservable 资源 实例 何创建 创建 | 更新日期: 2023-09-27 18:35:39
需要一些关于RX的帮助。我想定义可观察的,它应该在创建第一个订阅时创建资源,为每个新订阅发布一次此资源实例,并且当所有订阅完成后,必须释放该资源实例。类似于 Observable.Using,但具有 Publish(value) 和 RefCount 行为。我所有使用标准运算符表达它的尝试都失败了。这段代码可以做我想做的,但我认为必须有标准的方法来实现。我真的不想重新发明轮子。
using System;
using System.Reactive.Linq;
using System.Reactive.Disposables;
namespace ConsoleApplication1
{
class Program
{
static void Main()
{
// this part is what i can't express in standart RX operators..
Res res = null;
RefCountDisposable disp = null;
var @using = Observable.Create<Res>(obs =>
{
res = res ?? new Res();
disp = disp == null || disp.IsDisposed ? new RefCountDisposable(res) : disp;
obs.OnNext(res);
return new CompositeDisposable(disp.GetDisposable(), disp, Disposable.Create(() => res = null));
});
// end
var sub1 = @using.Subscribe(Print);
var sub2 = @using.Subscribe(Print);
sub1.Dispose();
sub2.Dispose();
sub1 = @using.Subscribe(Print);
sub2 = @using.Subscribe(Print);
sub1.Dispose();
sub2.Dispose();
Console.ReadKey();
}
static void Print(object o)
{
Console.WriteLine(o.GetHashCode());
}
}
class Res : IDisposable
{
public Res()
{
Console.WriteLine("CREATED");
}
public void Dispose()
{
Console.WriteLine("DISPOSED");
}
}
}
输出:
CREATED
1111
1111
DISPOSED
CREATED
2222
2222
DISPOSED
我对标准运算符的"最佳"尝试:
var @using = Observable.Using(() => new Res(), res => Observable.Never(res).StartWith(res))
.Replay(1)
.RefCount();
输出为:
CREATED
1111
1111
DISPOSED
CREATED
1111 <-- this is "wrong" value
2222
2222
DISPOSED
谢谢!
对不起,我的英语不好=(
在有点头疼之后,我终于意识到Using.Replay.RefCount
的问题在于Replay
内部使用单个ReplaySubject
实例调用Multicast
,但在我的特定情况下,我需要Replay
在每个新的首次订阅上重新创建主题。通过谷歌,我找到了RXX库,它ReconnectableObservable
就是答案。它使用主题工厂而不是主题实例在每次Connect
调用中重新创建主题(原始 rxx 代码,根本没有合约):
internal sealed class ReconnectableObservable<TSource, TResult> : IConnectableObservable<TResult>
{
private ISubject<TSource, TResult> Subject
{
get { return _subject ?? (_subject = _factory()); }
}
private readonly object _gate = new object();
private readonly IObservable<TSource> _source;
private readonly Func<ISubject<TSource, TResult>> _factory;
private ISubject<TSource, TResult> _subject;
private IDisposable _subscription;
public ReconnectableObservable(IObservable<TSource> source, Func<ISubject<TSource, TResult>> factory)
{
_source = source;
_factory = factory;
}
public IDisposable Connect()
{
lock (_gate)
{
if (_subscription != null)
return _subscription;
_subscription = new CompositeDisposable(
_source.Subscribe(Subject),
Disposable.Create(() =>
{
lock (_gate)
{
_subscription = null;
_subject = null;
}
}));
return _subscription;
}
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
lock (_gate)
{
return Subject.Subscribe(observer);
}
}
}
和一些扩展方法:
public static class Ext
{
public static IConnectableObservable<T> Multicast<T>(this IObservable<T> obs, Func<ISubject<T>> subjectFactory)
{
return new ReconnectableObservable<T, T>(obs, subjectFactory);
}
public static IConnectableObservable<T> ReplayReconnect<T>(this IObservable<T> obs, int replayCount)
{
return obs.Multicast(() => new ReplaySubject<T>(replayCount));
}
public static IConnectableObservable<T> PublishReconnect<T>(this IObservable<T> obs)
{
return obs.Multicast(() => new Subject<T>());
}
}
使用该代码,现在我可以这样做:
var @using = Observable
.Using(() => new Res(), _ => Observable.Never(_).StartWith(_))
.ReplayReconnect(1) // <-- that's it!
.RefCount();
雅虎!它按预期工作。
感谢所有回答的人!你把我推向了正确的方向。
试试这个:
var @using = Observable.Using(
() => new Res(),
res => Observable.Return(res).Concat(Observable.Never<Res>()))
.Publish((Res)null)
.RefCount()
.SkipWhile(res => res == null);
Concat
可防止观察者在可观察量产生其唯一值时自动取消订阅。
如果有办法使用标准运算符来做到这一点,我看不到它。
问题在于,标准运算符没有"仅当有订阅者时才缓存值"选项。
无论订阅者如何,重播运算符都将缓存最后一个值,这是您看到的"错误"值的根本原因。
它强调了这样一个事实,即使用 + 重播是一个危险的组合,因为它发出了一个释放的值。
我怀疑,如果有人确实使用标准运算符管理了一些魔法,它就不会像您的 Observable.Create 实现那样可读。
我已经多次使用Observable.Create来创建代码,我确信这些代码比使用标准运算符的等效构造更简洁,可读和可维护。
我的建议是,使用 Observable.Create 绝对没有错 - 将代码包装在一个接受资源的漂亮工厂方法中,您就可以开始了。这是我这样做的尝试,它只是添加了线程安全性的代码的重新打包:
public static IObservable<T> CreateObservableRefCountedResource<T>(Func<T> resourceFactory)
where T : class, IDisposable
{
T resource = null;
RefCountDisposable resourceDisposable = null;
var gate = new object();
return Observable.Create<T>(o =>
{
lock (gate)
{
resource = resource ?? resourceFactory();
var disposeAction = Disposable.Create(() =>
{
lock (gate)
{
resource.Dispose();
resource = null;
}
});
resourceDisposable = (resourceDisposable == null || resourceDisposable.IsDisposed)
? new RefCountDisposable(disposeAction)
: resourceDisposable;
o.OnNext(resource);
return new CompositeDisposable(
resourceDisposable,
resourceDisposable.GetDisposable());
}
});
}
已编辑 - 忘记调用资源Disposable.GetDisposable()!