为什么我不需要在这个冷观察对象上发布

本文关键字:对象 观察 不需要 为什么 | 更新日期: 2023-09-27 18:14:12

既然我在这里有一个冷Observable,我订阅了几次"分组",为什么我不需要在这里发布?我本来希望它带来不想要的结果,当我运行它,但令我惊讶的是,它的工作与不发布。为什么呢?

var subject = new List<string>
    {                            
    "test",                        
    "test",                 
    "hallo",
    "test",
    "hallo"
    }.ToObservable();
subject
    .GroupBy(x => x)
    .SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
         .Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
    .Subscribe(result => Console.WriteLine("You typed {0} {1} times", 
         result.Chars, result.Count));
// I Would have expect that I need to use Publish like that
//subject
//   .GroupBy(x => x)
//   .SelectMany(grouped => grouped.Publish(sharedGroup => 
//       sharedGroup.Scan(0, (count, _) => ++count)
//       .Zip(sharedGroup, (count, chars) => 
//           new { Chars = chars, Count = count })))
//   .Subscribe(result => Console.WriteLine("You typed {0} {1} times", 
//       result.Chars, result.Count));
Console.ReadLine();

编辑

正如Paul注意到的,由于我们订阅了底层冷可观察对象两次,我们应该遍历该序列两次。然而,我没有运气使这个效果可见。我试图插入调试行,但例如,这只打印一次"执行"。

var subject = new List<Func<string>>
{                            
() =>
    {
        Console.WriteLine("performing");
        return "test";
    },                        
() => "test",                 
() => "hallo",
() => "test",
() => "hallo"
}.ToObservable();

subject
    .Select(x => x())
    .GroupBy(x => x)
    .SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
            .Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
    .Subscribe(result => Console.WriteLine("You typed {0} {1} times",
            result.Chars, result.Count));

我想知道我们是否可以使效果可见,我们正在处理一个冷观察,而不使用Publish()。在另一个步骤中,我想看看Publish()(见上文)如何使效果消失。

编辑2

正如Paul建议的那样,我创建了一个自定义IObservable<string>用于调试目的。然而,如果你在它的Subscribe()方法中设置了一个断点,你会注意到它只会被击中一次

class Program
{
    static void Main(string[] args)
    {
        var subject = new MyObservable();
        subject
            .GroupBy(x => x)
            .SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
                 .Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
            .Subscribe(result => Console.WriteLine("You typed {0} {1} times",
                 result.Chars, result.Count));
       Console.ReadLine();
   }
}
class MyObservable : IObservable<string>
{
    public IDisposable Subscribe(IObserver<string> observer)
    {
        observer.OnNext("test");
        observer.OnNext("test");
        observer.OnNext("hallo");
        observer.OnNext("test");
        observer.OnNext("hallo");
        return Disposable.Empty;
    }
}

所以对我来说这个问题还没有解决。为什么在这个冷的Observable上不需要Publish呢?

为什么我不需要在这个冷观察对象上发布

您只使用一次基于列表的源代码,因此您不会在那里看到重复的订阅效果。回答你的问题的关键是以下观察:

An igroupedoobservable <K,>从GroupBy流出的对象本身就是一个伪装的主题。

GroupBy内部保存一个Dictionary<K,>>。每当消息传入时,它就会使用相应的密钥发送到主题中。您订阅了两次分组对象,这是安全的,因为主题将生产者与消费者解耦。

在Zip中重用'grouped'意味着您将有效地对每个分组进行两次分组-然而,由于您的源代码是Cold,因此它仍然有效。明白了吗?