在Rx.Net中,如何实现可观察的反馈循环,直到反馈耗尽
本文关键字:观察 循环 Net Rx 何实现 实现 | 更新日期: 2023-09-27 18:20:51
我有以下API:
IObservable<IList<SqlDataRecord>> WriteToDBAndGetFailedSource(SqlConnection conn, IList<SqlDataRecord> batch)
它尝试将批处理写入数据库。如果失败,则返回整个批次,否则返回的可观察对象为空。
我还有一个生产批次的来源:
IObservable<IList<SqlDataRecord>> GetDataSource(string filePath, int bufferThreshold)
现在,我可以这样组合它们:
var failedBatchesSource = GetDataSource(filePath, 1048576)
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100);
这将写入所有批次(最多同时写入100个),并返回失败批次的可观察结果。
我真正想要的是在暂停一段时间后将失败的批次反馈回批次的来源,可能是在原始来源仍在生产批次时。当然,我可以写这样的东西:
var failedBatchesSource = GetDataSource(filePath, 1048576)
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100)
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100);
但这当然是错误的,因为:
- 这打破了在再次处理失败的批次之前暂停的要求
- 它可能会生成100多个对数据库的并发写入请求
- 这就像用未知的迭代次数展开for循环一样——毫无成效
一旦我收集了所有的失败,我也可以突破可观察到的monad,并在一个循环中重新开始:
var src = GetDataSource(filePath, 1048576);
for (;;)
{
var failed = await src
.Select(batch => WriteToDBAndGetFailedSource(conn, batch))
.Merge(100)
.ToList();
if (failed.Count == 0)
{
break;
}
src = failed.ToObservable();
}
但我想知道,当我停留在可观察的monad范围内时,我是否能做得更好。
我认为这可能会奏效
public static IObservable<T> ProcessAll<T>(this IObservable<T> source, Func<T, IObservable<T>> processor, int mergeCount, TimeSpan failureDelay)
{
return Observable.Create<T>(
observer =>
{
var failed = new Subject<T>();
return source.Merge(failed)
.Select(processor)
.Merge(mergeCount)
.Delay(failureDelay)
.Subscribe(failed.OnNext, observer.OnError, observer.OnCompleted);
});
}
并像这样使用:
GetDataSource(filePath, 1048576)
.ProcessAll(batch => WriteToDBAndGetFailedSource(conn, batch), 100, TimeSpan.FromMilliseconds(500))
.Subscribe();
ProcessAll是一个可怕的名字,但现在是星期五晚上,我想不出更好的名字了。
使用Observable.Buffer。它允许您进行缓冲,直到您有100条记录要发送,或者直到X段时间过去。
或者,Observable.Interval只需每隔X个时间间隔触发一次。您可以在处理发布事件时添加并发限制。
只要有对象要发布,其中任何一个都应该重复激发。