反应式扩展SelectMany与大型对象

本文关键字:大型 对象 SelectMany 扩展 反应式 | 更新日期: 2023-09-27 17:58:40

我有一小段代码,它模拟了使用大型对象(即巨大的byte[])的流。对于序列中的每个项,都会调用一个异步方法来获得一些结果。问题是什么?实际上,它抛出OutOfMemoryException

与LINQPad(C#程序)兼容的代码:

void Main()
{
    var selectMany = Enumerable.Range(1, 100)
                   .Select(i => new LargeObject(i))
                   .ToObservable()
                   .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));
    selectMany
        .Subscribe(r => Console.WriteLine(r));
}

private static async Task<int> DoSomethingAsync(LargeObject lo)
{
    await Task.Delay(10000);
    return lo.Id;
}
internal class LargeObject
{
    public int Id { get; }
    public LargeObject(int id)
    {
        this.Id = id;
    }
    public byte[] Data { get; } = new byte[10000000];
}

似乎它同时创建了所有对象。我怎样才能用正确的方法呢?

其基本思想是调用DoSomethingAsync以便为每个对象获得一些结果,所以这就是我使用SelectMany的原因。为了简化,我只介绍了一个Task.Delay,但在现实生活中,它是一个可以同时处理一些项目的服务,所以我想介绍一些并发机制来利用它。

请注意,理论上,一次处理少量项目不应该填满内存。事实上,我们只需要每个"大对象"就可以获得DoSomethingAsync方法的结果。在该点之后,不再使用大对象

反应式扩展SelectMany与大型对象

我觉得自己在重复自己。与您上一个问题和我上一个答案类似,您需要做的是限制bigObjects的数量™以并发创建。

要做到这一点,您需要将对象创建和处理结合起来,并将其放在同一个线程池中。现在的问题是,我们使用异步方法来允许线程在异步方法运行时做其他事情。由于您的慢速网络调用是异步的,因此您的(快速)对象创建代码将使创建大型对象的速度过快。

相反,我们可以使用Rx通过将对象创建与异步调用相结合来计算并发运行的Observable的数量,并使用.Merge(maxConcurrent)来限制并发。

另外,我们还可以设置执行查询的最短时间。只需要Zip,它只需要最小的延迟。

static void Main()
{
    var selectMany = Enumerable.Range(1, 100)
                        .ToObservable()
                        .Select(i => Observable.Defer(() => Observable.Return(new LargeObject(i)))
                            .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)))
                            .Zip(Observable.Timer(TimeSpan.FromMilliseconds(400)), (el, _) => el)
                        ).Merge(4);
    selectMany
        .Subscribe(r => Console.WriteLine(r));
    Console.ReadLine();
}

private static async Task<int> DoSomethingAsync(LargeObject lo)
{
    await Task.Delay(10000);
    return lo.Id;
}
internal class LargeObject
{
    public int Id { get; }
    public LargeObject(int id)
    {
        this.Id = id;
        Console.WriteLine(id + "!");
    }
    public byte[] Data { get; } = new byte[10000000];
}

它似乎同时创建了所有对象。

是的,因为你正在同时创建它们。

如果我简化你的代码,我可以告诉你为什么:

void Main()
{
    var selectMany =
        Enumerable
            .Range(1, 5)
            .Do(x => Console.WriteLine($"{x}!"))
            .ToObservable()
            .SelectMany(i => Observable.FromAsync(() => DoSomethingAsync(i)));
    selectMany
        .Subscribe(r => Console.WriteLine(r));
}
private static async Task<int> DoSomethingAsync(int i)
{
    await Task.Delay(1);
    return i;
}

运行此操作将生成:

1.2.3.4.5.4.3.5.2.1.

由于Observable.FromAsync,您允许源在返回任何结果之前运行到完成。换句话说,您正在快速构建所有大型对象,但处理它们的速度较慢。

您应该允许Rx同步运行,但在默认调度程序上运行,这样您的主线程就不会被阻塞。然后,代码将在没有任何内存问题的情况下运行,并且您的程序将在主线程上保持响应。

这是这个的代码:

var selectMany =
    Observable
        .Range(1, 100, Scheduler.Default)
        .Select(i => new LargeObject(i))
        .Select(o => DoSomethingAsync(o))
        .Select(t => t.Result);

(我已经用Observable.Range(1, 100)有效地替换了Enumerable.Range(1, 100).ToObservable(),因为这也有助于解决一些问题。)

我尝试过测试其他选项,但到目前为止,任何允许DoSomethingAsync异步运行的选项都会出现内存不足错误。

ConcatMap支持开箱即用。我知道这个操作符在.net中不可用,但您可以使用Concat操作符进行同样的操作,该操作符会推迟订阅每个内部源,直到上一个完成。

您可以通过以下方式引入时间间隔延迟:

var source = Enumerable.Range(1, 100)
   .ToObservable()
   .Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (i, ts) => i)
   .Select(i => new LargeObject(i))
   .SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));

因此,它不是一次提取所有100个整数,立即将它们转换为LargeObject,然后对所有100调用DoSomethingAsync,而是将整数逐个滴出,每个间隔一秒。


这就是TPL+Rx解决方案的样子。不用说,它没有单独的Rx或单独的TPL那么优雅。然而,我认为这个问题不太适合Rx:

void Main()
{
    var source = Observable.Range(1, 100);
    const int MaxParallelism = 5;
    var transformBlock = new TransformBlock<int, int>(async i => await DoSomethingAsync(new LargeObject(i)),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxParallelism });
    source.Subscribe(transformBlock.AsObserver());
    var selectMany = transformBlock.AsObservable();
    selectMany
        .Subscribe(r => Console.WriteLine(r));
}