如何使用Rx.NET为相同的数据生成不同大小的批次

本文关键字:数据 NET 何使用 Rx | 更新日期: 2023-09-27 17:53:52

我有一个可观察的序列IObservable<int>,我想将其转换为IObservable<IList<int>>,同时保留以下要求:

  • 最后的可观察对象是一个批次序列,但有两种类型—— a B a 批次每个包含1000项,而B批次每个包含400项。
  • 原始序列中的每个数字必须批处理两次-一次在A批中,另一次在B批中
  • 生产过程应该是动态的,两种批次应该并行生产。即,首先生产所有 a 批次,然后不接受所有B批次的解决方案。

我可以很容易地使用Buffer算子生成一种批次,但我不知道如何在相同的数据上生成两批。

编辑

这是一个简单的代码,只生成一种批处理。

IObservable<int> source = GetSource(...);
await source
    .Buffer(1000)
    .Select(batch => Observable.FromAsync(() => ProcessBatchAsync(batch)))
    .Merge(MaxConcurrentBatches)
    .DefaultIfEmpty();
...
private async Task<Unit> ProcessBatchAsync(IList<int> batch)
{
   ...
   return Unit.Default;
}

我想要的是:

  • 一个由两项组成的可观察对象,其中每个项是另一个可观察对象的批中只有一种。主观察对象应该在批处理观察对象完成时完成。
  • 一个可观察对象,它产生两种批次。然后我需要根据单子的类型切换不同的操作符。

EDIT2

我需要详细说明约束条件。原始的可观察对象位于SqlReader对象的顶部,订阅它两次意味着创建两次读取器,并且访问数据库的次数增加了一次。我只需要一个订阅。

EDIT3

对于示例数据,我们可以使用Observable.Range(0,10000)。给定这个序列,我需要以任意顺序拥有以下批:
[0..1000), [0..400), [1000..2000),[400..800),[2000..3000),[800..1200),[3000..4000),[1200..1600) ... [9000..10000) ... [9600..10000)

或者您可以使用[0..100]范围,批量为10和4个数字。这并不重要,因为解决方案不应该依赖于批大小或批类型的数量。

它应该适用于有3个批次的10、4和6的数字,例如。或任何其他组合

我想我的约束让人们感到困惑。当我说"交错"时,我并不是说批类型必须严格旋转。这正是我试图解释不同类型的批次必须同时生产的方式。给定A、B、C三个批号,偶尔会出现A两批接两批生产的情况。然而,如果首先是A型的所有批次,然后是B型的所有批次,然后是c型的所有批次,这是不可接受的。

如何使用Rx.NET为相同的数据生成不同大小的批次

我认为这就是你所需要的:

var query =
    Observable
        .Range(0, 10000)
        .Publish(ns =>
            ns
                .Buffer(1000)
                .Concat(Observable.Repeat(new List<int>() as IList<int>))
                .Zip(ns.Buffer(400), (n1s, n2s) => new [] { n1s, n2s })
                .SelectMany(nns => nns)
                .Where(xs => xs.Any()));

按照您的示例输出正确地交错。

如果我把数字减少100倍,那么我得到这个输出:

<>之前0,1, 2, 3, 4, 5, 6, 7, 8, 90 1 2 310,11,12,13,14,15,16,17,18,194、5、6、720、21、22、23、24、25、26、27、28、298、9、10、1130、31、32、33、34、35、36、37、38、3912,13,14,1540、41、42、43、44、45、46、47、48、4916,17,18,1950, 51, 52, 53, 54, 55, 56, 57, 58, 5920,21,22,2360、61、62、63、64、65、66、67、68、6924,25,26,2770, 71, 72, 73, 74, 75, 76, 77, 78, 7928、29、30、3180、81、82、83、84、85、86、87、88、8932 33 34 3590、91、92、93、94、95、96、97、98、9936、37、38、3940 41 42 4344 45 46 4748 49 50 5152 53 54 5556 57 58 5960 61 62 6364 65 66 6768 69 70 7172 73 74 7576 77 78 7980 81 82 8384 85 86 8788 89 90 9192,93,94,9596、97、98、99之前

如果你不需要它们严格交错,那么这是一种泛化n缓冲区的方法:

var buffers = new [] { 1000, 400, 500, 300 };
var source = Observable.Range(0, 10000);
var result = source.Publish(ss => buffers.Select(b => ss.Buffer(b)).Merge());

正如Theo指出的,在第一个查询中使用的Observable.Repeat有可能产生非常大量的对象。如果使用Observable.Range,则不会发生这种情况,但如果使用Observable.Interval,则会出现大问题。

我很懒,根本没有尝试做数学来限制所需的填充符的数量。

很容易修复。

var total_item_count = 100;
var batch_a_size = 10;
var batch_b_size = 4;
var filler =
    Observable
        .Repeat(
            new List<long>() as IList<long>,
            Math.Abs(total_item_count / batch_a_size - total_item_count / batch_b_size) + 1);
var query =
    Observable
        .Interval(TimeSpan.FromSeconds(0.01))
        .Take(total_item_count)
        .Publish(ns =>
            ns
                .Buffer(batch_a_size)
                .Concat(filler)
                .Zip(
                    ns
                        .Buffer(batch_b_size)
                        .Concat(filler),
                    (n1s, n2s) => new [] { n1s, n2s })
                .SelectMany(nns => nns)
                .Where(xs => xs.Any()));

我得到与以前相同的输出,现在它是可配置的每个批大小,没有逃跑的Repeat

下面的方法如何

var observable = Observable.Range(0, 10000);
Task.Run(() => observable.Buffer(400).Subscribe(buffer => /* process buffer */ ));
Task.Run(() => observable.Buffer(1000).Subscribe(buffer => /* process buffer */ ));

两种批次将并行生产。

此代码创建了两个IObservable<IList<int>>序列。

var source = Observable.Range(0,10000)
                       .Publish();
var batchesOf1000 = source.Buffer(1000);
var batchesOf400 = source.Buffer(400);
batchesOf1000.Subscribe(batch => batch.Dump());
batchesOf400.Subscribe(batch => batch.Dump());
var disposable = source.Connect();