为什么在所有初始订阅者断开连接后 RefCount 不起作用?(重复)
本文关键字:RefCount 连接 不起作用 重复 断开 为什么 | 更新日期: 2023-09-27 18:37:04
按照李·坎贝尔的要求,这是这个原文的后续问题。它旨在在我试图解决的用例的上下文中提出问题。
我有一个包装原始 Web API 并提供令牌管理的WebApiService
。也就是说,它会跟踪身份验证令牌,将其传递到原始 API。下面是WebApiService
中公共方法之一的示例:
public IObservable<User> UpdateUserAsync(int id, UpdateUserRequest request) =>
this
.EnsureAuthenticatedAsync()
.SelectMany(
_ =>
this
.rawWebApi
.UpdateUserAsync(this.TokenValue, id, request));
如您所愿,它只需在转发到原始 Web API 之前调用EnsureAuthenticatedAsync
,并使用 this.TokenValue
传入令牌。
EnsureAuthenticatedAsync
方法如下所示:
public IObservable<Unit> EnsureAuthenticatedAsync() =>
this
.Token
.FirstAsync()
.SelectMany(token => string.IsNullOrWhiteSpace(token) ? this.authenticate : Observable.Return(Unit.Default));
我最初的问题是由我尝试编写身份验证管道(this.authenticate
上面的)引起的。请注意,这是用单个可观察量替换整个EnsureAuthenticatedAsync
方法的第一步。
对于authenticate
,我想要一个可观察的:
- 在有人订阅之前什么都不做(冷/懒)
- 只做一次工作,即使同时有多个订阅者
- 如果所有订阅者断开连接,则再次工作
为此,我想出了这样的东西:
this.authenticate = Observable
.Defer(() =>
Observable
.Create<Unit>(
async (observer, cancellationToken) =>
{
while (!cancellationToken.IsCancellationRequested)
{
var result = await this
.authenticationService
.AuthenticateAsync(this);
if (result.WasSuccessful)
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
return;
}
}
}))
.Publish()
.RefCount();
这里的想法是允许对WebApiService
方法进行任意数量的同时调用,以导致执行单个身份验证循环。一旦通过身份验证,所有订阅者都将完成,任何未来的订阅者都意味着我们需要再次重新进行身份验证,从而重新执行延迟的可观察量。
我最初问题中的简化可观察量存在相同的问题:一旦延迟可观察量完成一次,Publish
将立即完成任何未来的可观察量(尽管延迟可观察量被重新请求)。
如上所述,我的最终目标是将EnsureAuthenticatedAsync
完全替换为仅在令牌为 null 时执行此身份验证的管道。但那是第二步,我失败了:)
最初的问题:有没有办法编写一个管道,无论当前订阅者的数量如何,它都会执行一次,但如果所有订阅者断开连接并再次重新连接,则会再次执行?
可观察序列不能多次完成。此处要删除OnCompleted
调用,以便authenticate
无法多次完成,并将.Take(1)
添加到EnsureAuthenticatedAsync
,以便对authenticate
的订阅将在一个值后完成。
下面是一个工作控制台应用程序。 将对obs1
(具有Take
)的引用替换为obs
以重现您的问题。在这两种情况下,您都可以快速按回车键,让所有四个订阅者获得相同的值。
class Program
{
static int value = 0;
static void Main(string[] args)
{
var obs = Observable.Create<int>(observer =>
{
Console.WriteLine("Generating");
Interlocked.Increment(ref value);
return Observable.Return(value)
.Delay(TimeSpan.FromSeconds(1))
.Subscribe(observer);
})
.Publish()
.RefCount();
var obs1 = obs.Take(1);
obs1.Subscribe(
i => Console.WriteLine("First {0}", i),
() => Console.WriteLine("First complete"));
obs1.Subscribe(
i => Console.WriteLine("Second {0}", i),
() => Console.WriteLine("Second complete"));
Console.ReadLine();
obs1.Subscribe(
i => Console.WriteLine("Third {0}", i),
() => Console.WriteLine("Third complete"));
obs1.Subscribe(
i => Console.WriteLine("Fourth {0}", i),
() => Console.WriteLine("Fourth complete"));
Console.WriteLine("Press enter to exit");
Console.ReadLine();
}
}