在c#响应式框架中,当源流和目标流相同时,SelectMany是如何工作的?

本文关键字:SelectMany 何工作 工作 框架 响应 目标 | 更新日期: 2023-09-27 18:17:31

我有两个Subject,一个是具有ID的人对象流,另一个是代表谁与谁成为朋友的xref ID流。下面是一些简化的代码:

class Program {
    static void Main() {
        // Set up observables
        var people = new Subject<Person>();
        var friendMap = new Subject<FriendMap>();
        var friendNotices = from p1 in people
                            from p2 in people
                            from pair in friendMap
                            where p1.Id == pair.Id1 && p2.Id == pair.Id2
                            select p1.Name + " befriended " + p2.Name;
        // Subscribe to log
        friendNotices.Subscribe(Console.WriteLine);
        // Add people
        people.OnNext(new Person(1, "Alice"));
        people.OnNext(new Person(2, "Bob"));
        // Add relationships
        friendMap.OnNext(new FriendMap(1, 2)); // => "Alice befriended Bob"
        friendMap.OnNext(new FriendMap(2, 1)); // Doesn't show up!
    }
}
class Person {
    public Person(int id, string name) {
        Id = id;
        Name = name;
    }
    public string Name;
    public int Id;
}
class FriendMap {
    public FriendMap(int id1, int id2) {
        Id1 = id1;
        Id2 = id2;
    }
    public int Id1;
    public int Id2;
}

我遇到的问题是,有时添加xref不会导致friendNotice事件。特别是,如果具有Id2的人在具有Id1的人之前创建,则似乎会失败。

这是一个bug在Rx或bug在我的代码?不管怎样,我该如何让它工作呢?

("成为朋友"在我的应用程序中是不可交换的——Alice和Bob成为朋友的关系与Bob和Alice成为朋友的关系是不同的,所以"只是交换id然后重试"在我的情况下不是一个可用的解决方案)

在c#响应式框架中,当源流和目标流相同时,SelectMany是如何工作的?

这里的问题是对SelectMany函数如何工作的误解(这是linq推导式from ... from ...所映射的操作符)。

该构造从源流中获取每个元素,并将其投影到目标流中。它通过在目标流上为源的每个元素创建订阅来为投影服务来实现这一点。

让我们用一小段伪代码来检查一下。只考虑查询的这一部分:

from p1 in people
from p2 in people

现在,让我们把人群分成人群a和人群b:

from p1 in peopleA
from p2 in peopleB

如果我们现在调用:

peopleA.OnNext(Alice);

实际发生的是,将在peopleB上创建一个新的订阅,以服务于Alice对peopleB的投影。此时,peopleB中没有元素,因此不会发生投影。

现在如果我们调用:

peopleB.OnNext(Tom);

将运行Alice -> peopleB的投影并输出(Alice, Tom)。

现在称:

peopleB.OnNext(Dick);

现在Alice -> peopleB的投影继续,所以(Alice, Dick)将输出。

现在称:

peopleA.OnNext(Bob);

现在在peopleB上开始Bob的新订阅-但是在peopleB发出之前不会输出任何内容。

现在称:

peopleB.OnNext(Harry);

同时运行Alice和Bob订阅,我们将得到(Alice, Harry)和(Bob, Harry);

给你一个简单的例子:

var source = new Subject<string>();
var source2 = new Subject<string>();
var res = from s in source
          from t in source2
          select s + " " + t;
res.Subscribe(Console.WriteLine);
source.OnNext("A"); 
source2.OnNext("1");
source2.OnNext("2");
source.OnNext("B");
source2.OnNext("3");

将给出输出:

A 1
A 2
A 3
B 3

回到"self SelectMany"。现在一切开始变得有点棘手了。关键的部分是,正在设置的订阅不会捕获触发其设置的"当前"项目。因此,让我们标记SelectMany的每个部分A和B:

from p1 in people (call this A)
from p2 in people (call this B)

调用时:

people.OnNext(Alice);

Alice在A上的订阅将在B上进行,但由于它是在Alice发出后进行的,因此它不会捕获她,也不会输出任何内容。

现在我们调用:

people.OnNext(Bob);

Alice的订阅将在B上看到Bob,这导致输出(Alice, Bob)。Bob在A上的订阅将在B上创建,但同样不会输出任何内容,因为它错过了Bob的输出。

这就是你所看到的。唯一的组合是(Alice, Bob)。

你可以修复这个问题,虽然让人们在新的订阅设置时重播它的内容。像这样修改示例的第一部分:

    // Set up observables
    var people = new Subject<Person>();
    var peopleR = people.Replay().RefCount();
    var friendMap = new Subject<FriendMap>();
    var friendNotices = from p1 in peopleR
                        from p2 in peopleR
                        from pair in friendMap
                        where p1.Id == pair.Id1 && p2.Id == pair.Id2
                        select p1.Name + " befriended " + p2.Name;

…但是,如果您的流长时间运行,这当然是不可取的,因为您将所有内容缓存在内存中。

就你的特殊问题而言,我认为你最好走另一条路——但是在不知道你想要达到的目标的情况下很难规定,这已经是一篇很长的文章了!一种方法是对FriendMap条目进行简单的订阅,只需查找朋友的名字就可以输出所需的消息。(我可能错过了更大的画面!)。

希望你能理解你的方法的问题。