如何关联函数输入和输出

本文关键字:函数 输入 输出 关联 何关联 | 更新日期: 2023-09-27 17:56:56

考虑下面的简单程序。它具有整数的可观察量和一个计算最近发布的整数是偶数还是奇数的函数。出乎意料的是,程序在报告数字更改之前报告最近的数字是否为偶数/奇数。

static void Main(string[] args) {
    int version = 0;
    var numbers = new Subject<int>();
    IObservable<bool> isNumberEven = numbers.Select(i => i % 2 == 0);
    isNumberEven
        .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) })
        .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}"));
    numbers
        .Select(i => new { Number = i, Version = Interlocked.Increment(ref version) })
        .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.Number}"));
    numbers.OnNext(1);
    numbers.OnNext(2);
    numbers.OnNext(3);
    Console.ReadLine();
}

输出为:

Time 1 : False
Time 2 : 1
Time 3 : True
Time 4 : 2
Time 5 : False
Time 6 : 3

我认为更改数字会引发一连串的下游影响,这些影响将按发生的顺序报告。交换订阅订单将交换报告结果的方式。我知道 rx 是异步的,事情有可能以非确定的顺序发生。如果我使用.我的函数中的 delay() 或网络调用我不确定何时会报告结果。但在这种情况下,我很惊讶。

为什么这是一件大事?我认为这意味着,如果我想尝试关联函数输入和输出(例如打印发布的数字以及它们是偶数还是奇数),我必须在输出结果中包含输入参数,如下所示:

var isNumberEven = numbers.Select(i => new {
    Number = i,
    IsEven = i % 2 == 0
});

我想我可以构建一堆简单的小函数,然后使用 rx 运算符组合它们来完成复杂的计算。但也许我不能使用 rx 运算符来组合/连接/关联结果。当我定义每个函数时,我必须自己关联输入和输出。

在某些情况下,我可以使用 rx 运算符来关联结果。如果每个输入都生成一个输出,我可以压缩两者。但是一旦你做了类似限制输入的事情,它就不再起作用了。

这个版本的程序似乎确实以合理的方式报告数字是偶数还是奇数。

static void Main(string[] args) {
    var numbers = new Subject<int>();
    var isNumberEven = numbers.Select(i => i % 2 == 0);
    var publishedNumbers = numbers.Publish().RefCount();
    var report =
        publishedNumbers
        .GroupJoin(
            isNumberEven,
            (_) => publishedNumbers,
            (_) => Observable.Empty<bool>(),
            (n, e) => new { Number = n, IsEven = e })
        .SelectMany(i => i.IsEven.Select(j => new { Number = i.Number, IsEven = j }));
    report.Subscribe(i => Console.WriteLine($"{i.Number} {(i.IsEven ? "even" : "odd")}"));
    numbers.OnNext(1);
    numbers.OnNext(2);
    numbers.OnNext(3);
    Console.ReadLine();
}

输出如下所示:

1 odd
2 even
3 odd

但我不知道这是一个幸运的巧合,还是我是否可以依靠它。Rx 中的哪些操作按确定性顺序发生?哪些是不可预测的?我是否应该定义所有函数以在结果中包含输入参数?

如何关联函数输入和输出

您的第一个程序的行为完全符合我的预期,并且确定性地如此。

我知道 rx 是异步的,事情有可能以非确定的顺序发生。

如果引入非确定性

行为(如并发/调度),事情只会以非确定性顺序发生,否则 Rx 是确定性的。

这里有几个问题/误解在起作用。1) 可变的外部状态 - version2)主题的使用(但在这个样本中根本不是一个问题)3)对回调方式的误解。

让我们只关注 3)。如果我们带你编写代码并将其解包到它的基本调用站点,你可能会看到 Rx 在幕后是多么简单。

numbers.OnNext(1);主题将查找其订阅,并按照订阅的顺序OnNext每个订阅。

IObservable<bool> isNumberEven = numbers.Select(i => i % 2 == 0);
isNumberEven
    .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) })
    .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}"));

也可以减少到

numbers.Select(i => i % 2 == 0)
    .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) })
    .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}"));

有人可能会争辩说,由于isNumberEven从未在其他任何地方使用过,因此您应该将其简化为此。

所以我们可以看到我们有第一个订阅者。实际上,它将运行的代码是这样的

private void HandleOnNext(int i)
{
    var isEven = i % 2 == 0
    var temp = new { IsEven = isEven , Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp .Version} : {temp .IsEven}");
}

我们的第二个订阅者(因为 .Subscribe( 方法是在偶数订阅之后调用的)是numbers订阅者。他的代码可以有效地归结为

private void HandleOnNext(int i)
{
    var temp = new { Number = i, Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp.Version} : {temp.Number}");
}

所以一旦你完全解构了代码,你最终基本上会得到这个

void Main()
{
    int version = 0;
    //numbers.OnNext(1);
    ProcessEven(1, ref version);
    ProcessNumber(1, ref version);
    //numbers.OnNext(2);
    ProcessEven(2, ref version);
    ProcessNumber(2, ref version);
    //numbers.OnNext(3);
    ProcessEven(3, ref version);
    ProcessNumber(3, ref version);
}
// Define other methods and classes here
private void ProcessEven(int i, ref int version)
{
    var isEven = i % 2 == 0;
    var temp = new { IsEven = isEven, Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp.Version} : {temp.IsEven}");
}
private void ProcessNumber(int i, ref int version)
{
    var temp = new { Number = i, Version = Interlocked.Increment(ref version) };
    Console.WriteLine($"Time {temp.Version} : {temp.Number}");
}

一旦所有的回调和订阅都统一了,那么你可以看到这不是奇迹发生,一切都是确定性的。

我是否应该定义所有函数以在结果中包含输入参数?

要回答您的问题(鉴于您对 Rx 的误解,我犹豫不决),您只需要在结果序列的顺序不确定时才这样做。例如,如果您一次发出多个 Web 请求。您无法确定它们是否会按照您发送的顺序做出响应。但是,您可以强制这些方案与运算符的使用保持一致,例如Concat