为什么在所有初始订阅者断开连接后 RefCount 不起作用?(重复)

本文关键字:RefCount 连接 不起作用 重复 断开 为什么 | 更新日期: 2023-09-27 18:37:04

按照李·坎贝尔的要求,这是这个原文的后续问题。它旨在在我试图解决的用例的上下文中提出问题。

我有一个包装原始 Web API 并提供令牌管理的WebApiService。也就是说,它会跟踪身份验证令牌,将其传递到原始 API。下面是WebApiService中公共方法之一的示例:

public IObservable<User> UpdateUserAsync(int id, UpdateUserRequest request) =>
    this
        .EnsureAuthenticatedAsync()
        .SelectMany(
            _ =>
                this
                    .rawWebApi
                    .UpdateUserAsync(this.TokenValue, id, request));

如您所愿,它只需在转发到原始 Web API 之前调用EnsureAuthenticatedAsync,并使用 this.TokenValue 传入令牌。

EnsureAuthenticatedAsync方法如下所示:

public IObservable<Unit> EnsureAuthenticatedAsync() =>
    this
        .Token
        .FirstAsync()
        .SelectMany(token => string.IsNullOrWhiteSpace(token) ? this.authenticate : Observable.Return(Unit.Default));

最初的问题是由我尝试编写身份验证管道(this.authenticate上面的)引起的。请注意,这是用单个可观察量替换整个EnsureAuthenticatedAsync方法的第一步。

对于authenticate,我想要一个可观察的:

  1. 在有人订阅之前什么都不做(冷/懒)
  2. 只做一次工作,即使同时有多个订阅者
  3. 如果所有订阅者断开连接,则再次工作

为此,我想出了这样的东西:

this.authenticate = Observable
    .Defer(() =>
        Observable
            .Create<Unit>(
                async (observer, cancellationToken) =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        var result = await this
                            .authenticationService
                            .AuthenticateAsync(this);
                        if (result.WasSuccessful)
                        {
                            observer.OnNext(Unit.Default);
                            observer.OnCompleted();
                            return;
                        }
                    }
                }))
    .Publish()
    .RefCount();

这里的想法是允许对WebApiService方法进行任意数量的同时调用,以导致执行单个身份验证循环。一旦通过身份验证,所有订阅者都将完成,任何未来的订阅者都意味着我们需要再次重新进行身份验证,从而重新执行延迟的可观察量。

当然,上述可观察量与

我最初问题中的简化可观察量存在相同的问题:一旦延迟可观察量完成一次,Publish将立即完成任何未来的可观察量(尽管延迟可观察量被重新请求)。

如上所述,我的最终目标是将EnsureAuthenticatedAsync完全替换为仅在令牌为 null 时执行此身份验证的管道。但那是第二步,我失败了:)

因此,回到

最初的问题:有没有办法编写一个管道,无论当前订阅者的数量如何,它都会执行一次,但如果所有订阅者断开连接并再次重新连接,则会再次执行?

为什么在所有初始订阅者断开连接后 RefCount 不起作用?(重复)

可观察序列不能多次完成。此处要删除OnCompleted调用,以便authenticate无法多次完成,并将.Take(1)添加到EnsureAuthenticatedAsync,以便对authenticate订阅将在一个值后完成。

下面是一个工作控制台应用程序。 将对obs1(具有Take)的引用替换为obs以重现您的问题。在这两种情况下,您都可以快速按回车键,让所有四个订阅者获得相同的值。

class Program
{
    static int value = 0;
    static void Main(string[] args)
    {
        var obs = Observable.Create<int>(observer =>
        {
            Console.WriteLine("Generating");
            Interlocked.Increment(ref value);
            return Observable.Return(value)
                .Delay(TimeSpan.FromSeconds(1))
                .Subscribe(observer);
        })
        .Publish() 
        .RefCount();
        var obs1 = obs.Take(1);
        obs1.Subscribe(
            i => Console.WriteLine("First {0}", i), 
            () => Console.WriteLine("First complete"));
        obs1.Subscribe(
            i => Console.WriteLine("Second {0}", i), 
            () => Console.WriteLine("Second complete"));
        Console.ReadLine();
        obs1.Subscribe(
            i => Console.WriteLine("Third {0}", i), 
            () => Console.WriteLine("Third complete"));
        obs1.Subscribe(
            i => Console.WriteLine("Fourth {0}", i), 
            () => Console.WriteLine("Fourth complete"));
        Console.WriteLine("Press enter to exit");
        Console.ReadLine();
    }
}