只有当值不在本地缓存中时,才调用昂贵的响应扩展IObservable函数

本文关键字:调用 响应 函数 IObservable 扩展 缓存 | 更新日期: 2023-09-27 17:52:11

我正在使用响应式扩展(Reactive Extensions, Rx)和存储库模式来方便地从相对较慢的数据源获取数据。我有以下(简化的)接口:

public interface IStorage
{
    IObservable<INode> Fetch(IObservable<Guid> ids);
}

创建IStorage实现的实例是缓慢的-想想创建web服务或db连接。ids观测中的每个Guid都会在返回观测中产生一个一对一的INode(或null),每个结果都是昂贵的。因此,只有当我至少有一个值要获取,然后使用IStorage仅为每个Guid获取一次值时,才实例化IStorage对我来说是有意义的。

为了限制对IStorage的调用,我在Repository类中缓存结果,如下所示:
public class Repository
{
    private Dictionary<Guid, INode> NodeCache { get; set; }
    private Func<IStorage> StorageFactory { get; set; }
    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        var lazyStorage = new Lazy<IStorage>(this.StorageFactory);
        // from id in ids
        // if NodeCache contains id select NodeCache[id]
        // else select node from lazyStorage.Value.Fetch(...)
    }
}

Repository.Fetch(...)方法中,我包含了指示我正在尝试做什么的注释。

本质上,如果NodeCache包含所有被提取的id,那么IStorage永远不会被实例化,并且几乎没有延迟返回结果。然而,如果任何一个id不在缓存中,那么IStorage将被实例化,所有未知的id将通过IStorage.Fetch(...)方法传递。

需要维护一对一的映射,包括顺序保持。

任何想法?

只有当值不在本地缓存中时,才调用昂贵的响应扩展IObservable函数

我花了一点时间才算出来,但我终于有了自己的解决方案。

我定义了两个名为FromCacheOrFetch的扩展方法,它们具有以下签名:

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, R> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
        where R : class
IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source,
    Func<T, Maybe<R>> cache,
    Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)

第一个使用标准CLR/Rx类型,第二个使用Maybe monad(可空类型,不限于值类型)。

第一个方法只是将Func<T, R>转换为Func<T, Maybe<R>>,并调用第二个方法。

背后的基本思想是,当要查询源时,将检查缓存中的每个值,以查看结果是否已经存在,如果存在,则立即返回结果。然而,如果任何结果丢失,那么只有在这种情况下,通过传入Subject<T>调用fetch函数,现在所有缓存丢失都通过fetch函数传递。调用代码负责将结果添加到缓存中。代码通过fetch函数异步处理所有值,并将结果以及缓存的结果重新组装成正确的顺序。

效果很好。: -)

代码如下:

public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
    Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
        where R : class
{
    return source
        .FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
}
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source,
    Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch,
    IScheduler scheduler)
{
    var results = new Subject<R>();
    var disposables = new CompositeDisposable();
    var loop = new EventLoopScheduler();
    disposables.Add(loop);
    var sourceDone = false;
    var pairsDone = true;
    var exception = (Exception)null;
    var fetchIn = new Subject<T>();
    var fetchOut = (IObservable<R>)null;
    var pairs = (IObservable<KeyValuePair<int, R>>)null;
    var lookup = new Dictionary<T, int>();
    var list = new List<Maybe<R>>();
    var cursor = 0;
    Action checkCleanup = () =>
    {
        if (sourceDone && pairsDone)
        {
            if (exception == null)
            {
                results.OnCompleted();
            }
            else
            {
                results.OnError(exception);
            }
            loop.Schedule(() => disposables.Dispose());
        }
    };
    Action dequeue = () =>
    {
        while (cursor != list.Count)
        {
            var mr = list[cursor];
            if (mr.HasValue)
            {
                results.OnNext(mr.Value);
                cursor++;
            }
            else
            {
                break;
            }
        }
    };
    Action<KeyValuePair<int, R>> nextPairs = kvp =>
    {
        list[kvp.Key] = Maybe<R>.Something(kvp.Value);
        dequeue();
    };
    Action<Exception> errorPairs = ex =>
    {
        fetchIn.OnCompleted();
        pairsDone = true;
        exception = ex;
        checkCleanup();
    };
    Action completedPairs = () =>
    {
        pairsDone = true;
        checkCleanup();
    };
    Action<T> sourceNext = t =>
    {
        var mr = cache(t);
        list.Add(mr);
        if (mr.IsNothing)
        {
            lookup[t] = list.Count - 1;
            if (fetchOut == null)
            {
                pairsDone = false;
                fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
                pairs = fetchIn
                    .Select(x => lookup[x])
                    .Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
                disposables.Add(pairs
                    .ObserveOn(loop)
                    .Subscribe(nextPairs, errorPairs, completedPairs));
            }
            fetchIn.OnNext(t);
        }
        else
        {
            dequeue();
        }
    };
    Action<Exception> errorSource = ex =>
    {
        sourceDone = true;
        exception = ex;
        fetchIn.OnCompleted();
        checkCleanup();
    };
    Action completedSource = () =>
    {
        sourceDone = true;
        fetchIn.OnCompleted();
        checkCleanup();
    };
    disposables.Add(source
        .ObserveOn(loop)
        .Subscribe(sourceNext, errorSource, completedSource));
    return results.ObserveOn(scheduler);
}

类似这样的内容(我假设您只想为所有订阅者实例化存储一次):

public class Repository
{
    public Repository()
    {
        _lazyStorage = new Lazy<IStorage>(StorageFactory);
    }
    private readonly Lazy<IStorage> _lazyStorage;
    private Dictionary<Guid, INode> NodeCache { get; set; }
    private Func<IStorage> StorageFactory { get; set; }
    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        return Observable
            .CreateWithDisposable<INode>(observer =>
                ids.Subscribe(x =>
                {
                    INode node;
                    if (NodeCache.TryGetValue(x, out node))
                        observer.OnNext(node);
                    else
                    {
                        node = _lazyStorage.Value.Fetch(x);
                        NodeCache[x] = node;
                        observer.OnNext(node);
                    }
                }, observer.OnError, observer.OnCompleted));
    }
}

编辑:嗯,这个订单一边保存一边保管。异步取很有趣——等待存储。Fetch应该阻塞所有未来的值…思维…

我想我明白了…也许……如果你需要保持秩序,你需要排队。在RX世界中,队列是。concat。下面的内容对你有用吗?

public class Repository
{
    public Repository()
    {
        _lazyStorage = new Lazy<IStorage>(StorageFactory);
    }
    private readonly Lazy<IStorage> _lazyStorage;
    private Dictionary<Guid, INode> NodeCache { get; set; }
    private Func<IStorage> StorageFactory { get; set; }
    private IObservable<INode> Fetcher(Guid id)
    {
        return Observable.Defer(() =>
        {
            INode node;
            return NodeCache.TryGetValue(id, out node)
                ? Observable.Return(node)
                : _lazyStorage.Value.Fetch(id).Do(x => NodeCache[id] = x);
        });
    }
    public IObservable<INode> Fetch(IObservable<Guid> ids)
    {
        return ids.Select(Fetcher).Concat();
    }
}