轮询存储库直到返回有效值的可观察序列
本文关键字:观察 有效值 返回 存储 | 更新日期: 2023-09-27 18:25:25
我必须轮询数据库,直到它包含有效的数据。
要做到这一点,我有一个存储库,它应该每n
秒查询一次,以便获得一个我自己的实体,称为DestinationResponse
。
class DestinationResponse
{
bool HasDestination { get; set; }
bool Destination { get; set; }
}
当DestinationResponse的属性为HasDestination
到true
时,返回Destination
。
因此,我的可观察序列应该得到所有等待HasDestination=true
的响应。它基本上是在等待HasDestination设置为true的响应。当这种情况发生时,它会返回它并完成序列。它最多只推送一个元素!
我目前的做法是:
var pollingPeriod = TimeSpan.FromSeconds(n);
var scheduler = new EventLoopScheduler(ts => new Thread(ts) {Name = "DestinationPoller"});
var observable = Observable.Interval(pollingPeriod, scheduler)
.SelectMany(_ => destinationRepository.GetDestination().ToObservable())
.TakeWhile(response => !response.HasDestination)
.TakeLast(1)
.Select(response => response.Destination);
我知道我错了,主要是因为即使对GetDestination的最后一次调用还没有完成,Interval调用也会继续生成。
注意:CCD_ 7返回一个CCD_。
将数据库轮询的答案与Reactive Extensions合并到您的示例代码中,我认为可以满足您的需求。
var pollingPeriod = TimeSpan.FromSeconds(n);
var scheduler = new EventLoopScheduler(ts => new Thread(ts) {Name = "DestinationPoller"});
var query = Observable.Timer(pollingPeriod , scheduler)
.SelectMany(_ => destinationRepository.GetDestination().ToObservable())
.TakeWhile(response => response.HasDestination)
.Retry() //Loop on errors
.Repeat() //Loop on success
.Select(response => response.Destination)
.Take(1);
这段代码可能是我想要的查询。你觉得怎么样?
private IObservable<Destination> CreateOrderDestinationObservable(string boxId, int orderId)
{
var pollingPeriod = TimeSpan.FromSeconds(DestinationPollingDelay);
var scheduler = new EventLoopScheduler(ts => new Thread(ts) {Name = "DestinationPoller"});
var observable = Observable.Timer(pollingPeriod, scheduler)
.SelectMany(_ => externalBridgeRepository.GetDestination(boxId, orderId).ToObservable())
.Where(response => response.HasDestination)
.Retry()
.Repeat()
.Take(1)
.Select(response => response.Destination);
return observable;
}