在Rx.Net中,如何实现可观察的反馈循环,直到反馈耗尽

本文关键字:观察 循环 Net Rx 何实现 实现 | 更新日期: 2023-09-27 18:20:51

我有以下API:

IObservable<IList<SqlDataRecord>> WriteToDBAndGetFailedSource(SqlConnection conn, IList<SqlDataRecord> batch)

它尝试将批处理写入数据库。如果失败,则返回整个批次,否则返回的可观察对象为空。

我还有一个生产批次的来源:

IObservable<IList<SqlDataRecord>> GetDataSource(string filePath, int bufferThreshold)

现在,我可以这样组合它们:

var failedBatchesSource = GetDataSource(filePath, 1048576)
  .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
  .Merge(100);

这将写入所有批次(最多同时写入100个),并返回失败批次的可观察结果。

我真正想要的是在暂停一段时间后将失败的批次反馈回批次的来源,可能是在原始来源仍在生产批次时。当然,我可以写这样的东西:

var failedBatchesSource = GetDataSource(filePath, 1048576)
  .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
  .Merge(100)
  .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
  .Merge(100);

但这当然是错误的,因为:

  1. 这打破了在再次处理失败的批次之前暂停的要求
  2. 它可能会生成100多个对数据库的并发写入请求
  3. 这就像用未知的迭代次数展开for循环一样——毫无成效

一旦我收集了所有的失败,我也可以突破可观察到的monad,并在一个循环中重新开始:

            var src = GetDataSource(filePath, 1048576);
            for (;;)
            {
                var failed = await src
                    .Select(batch => WriteToDBAndGetFailedSource(conn, batch))
                    .Merge(100)
                    .ToList();
                if (failed.Count == 0)
                {
                    break;
                }
                src = failed.ToObservable();
            }

但我想知道,当我停留在可观察的monad范围内时,我是否能做得更好。

在Rx.Net中,如何实现可观察的反馈循环,直到反馈耗尽

认为这可能会奏效

public static IObservable<T> ProcessAll<T>(this IObservable<T> source, Func<T, IObservable<T>> processor, int mergeCount, TimeSpan failureDelay)
{
    return Observable.Create<T>(
        observer =>
            {
                var failed = new Subject<T>();
                return source.Merge(failed)
                        .Select(processor)
                        .Merge(mergeCount)
                        .Delay(failureDelay)
                        .Subscribe(failed.OnNext, observer.OnError, observer.OnCompleted);
            });
}

并像这样使用:

GetDataSource(filePath, 1048576)
  .ProcessAll(batch => WriteToDBAndGetFailedSource(conn, batch), 100, TimeSpan.FromMilliseconds(500))
  .Subscribe();

ProcessAll是一个可怕的名字,但现在是星期五晚上,我想不出更好的名字了。

使用Observable.Buffer。它允许您进行缓冲,直到您有100条记录要发送,或者直到X段时间过去。

或者,Observable.Interval只需每隔X个时间间隔触发一次。您可以在处理发布事件时添加并发限制。

只要有对象要发布,其中任何一个都应该重复激发。