轮询存储库直到返回有效值的可观察序列

本文关键字:观察 有效值 返回 存储 | 更新日期: 2023-09-27 18:25:25

我必须轮询数据库,直到它包含有效的数据。

要做到这一点,我有一个存储库,它应该每n秒查询一次,以便获得一个我自己的实体,称为DestinationResponse

class DestinationResponse 
{
     bool HasDestination { get; set; }
     bool Destination { get; set; }
}

当DestinationResponse的属性为HasDestinationtrue时,返回Destination

因此,我的可观察序列应该得到所有等待HasDestination=true的响应。它基本上是在等待HasDestination设置为true的响应。当这种情况发生时,它会返回它并完成序列。它最多只推送一个元素!

我目前的做法是:

var pollingPeriod = TimeSpan.FromSeconds(n);
var scheduler = new EventLoopScheduler(ts => new Thread(ts) {Name = "DestinationPoller"});
var observable = Observable.Interval(pollingPeriod, scheduler)                
    .SelectMany(_ => destinationRepository.GetDestination().ToObservable())
    .TakeWhile(response => !response.HasDestination)
    .TakeLast(1)
    .Select(response => response.Destination);

我知道我错了,主要是因为即使对GetDestination的最后一次调用还没有完成,Interval调用也会继续生成。

注意:CCD_ 7返回一个CCD_。

轮询存储库直到返回有效值的可观察序列

将数据库轮询的答案与Reactive Extensions合并到您的示例代码中,我认为可以满足您的需求。

var pollingPeriod = TimeSpan.FromSeconds(n);
var scheduler = new EventLoopScheduler(ts => new Thread(ts) {Name = "DestinationPoller"});
var query = Observable.Timer(pollingPeriod , scheduler)
    .SelectMany(_ => destinationRepository.GetDestination().ToObservable())
    .TakeWhile(response => response.HasDestination)
    .Retry()    //Loop on errors
    .Repeat()  //Loop on success
    .Select(response => response.Destination)
    .Take(1);

这段代码可能是我想要的查询。你觉得怎么样?

private IObservable<Destination> CreateOrderDestinationObservable(string boxId, int orderId)
{
    var pollingPeriod = TimeSpan.FromSeconds(DestinationPollingDelay);
    var scheduler = new EventLoopScheduler(ts => new Thread(ts) {Name = "DestinationPoller"});
    var observable = Observable.Timer(pollingPeriod, scheduler)
        .SelectMany(_ => externalBridgeRepository.GetDestination(boxId, orderId).ToObservable())
        .Where(response => response.HasDestination)
        .Retry()
        .Repeat()
        .Take(1)
        .Select(response => response.Destination);
    return observable;
}