Rx Amb extension
本文关键字:extension Amb Rx | 更新日期: 2023-09-27 18:34:51
我正在使用Silverlight的Reactive框架,并希望实现以下目标。
我正在尝试为 Silverlight 客户端创建一个典型的数据提供程序,该提供程序还利用了 MS Ent Lib 中提供的缓存框架。这些方案要求我必须在访问 WCF 数据客户端之前签入缓存中的键值对。
通过使用 Rx 扩展 Amb,我能够从缓存或 WCF 数据客户端(以先返回者为准(中提取数据,但如果值位于缓存中,如何阻止 WCF 客户端执行调用?
我还想考虑竞争条件,例如,如果第一个订阅者请求一些数据并且提供程序正在从 WCF 数据客户端(异步(获取数据,我如何防止后续异步请求执行相同的操作(在此阶段,缓存尚未填充(。
我遇到了完全相同的问题。我用具有以下签名的扩展方法解决了它:
IObservable<R> FromCacheOrFetch<T, R>(
this IObservable<T> source,
Func<T, R> cache,
Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler) where R : class
实际上,这样做是获取源可观察量并返回一个可观察量,该可观察量将每个输入值与其输出值匹配。
要获取每个输出值,它将首先检查缓存。如果该值存在于缓存中,则使用该值。如果没有,它将仅在缓存中没有的值上启动fetch
函数。如果所有值都在缓存中,则 fetch
函数永远不会启动 - 因此没有服务连接设置惩罚等。
我会给你代码,但它基于使用Maybe<T>
monad 的扩展方法的略有不同的版本 - 所以你可能会发现你需要摆弄实现。
在这里:
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
where R : class
{
return source.FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
}
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
{
var results = new Subject<R>();
var disposables = new CompositeDisposable();
var loop = new EventLoopScheduler();
disposables.Add(loop);
var sourceDone = false;
var pairsDone = true;
var exception = (Exception)null;
var fetchIn = new Subject<T>();
var fetchOut = (IObservable<R>)null;
var pairs = (IObservable<KeyValuePair<int, R>>)null;
var lookup = new Dictionary<T, int>();
var list = new List<Maybe<R>>();
var cursor = 0;
Action checkCleanup = () =>
{
if (sourceDone && pairsDone)
{
if (exception == null)
{
results.OnCompleted();
}
else
{
results.OnError(exception);
}
loop.Schedule(() => disposables.Dispose());
}
};
Action dequeue = () =>
{
while (cursor != list.Count)
{
var mr = list[cursor];
if (mr.HasValue)
{
results.OnNext(mr.Value);
cursor++;
}
else
{
break;
}
}
};
Action<KeyValuePair<int, R>> nextPairs = kvp =>
{
list[kvp.Key] = Maybe<R>.Something(kvp.Value);
dequeue();
};
Action<Exception> errorPairs = ex =>
{
fetchIn.OnCompleted();
pairsDone = true;
exception = ex;
checkCleanup();
};
Action completedPairs = () =>
{
pairsDone = true;
checkCleanup();
};
Action<T> sourceNext = t =>
{
var mr = cache(t);
list.Add(mr);
if (mr.IsNothing)
{
lookup[t] = list.Count - 1;
if (fetchOut == null)
{
pairsDone = false;
fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
pairs = fetchIn.Select(x => lookup[x]).Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
disposables.Add(pairs.ObserveOn(loop).Subscribe(nextPairs, errorPairs, completedPairs));
}
fetchIn.OnNext(t);
}
else
{
dequeue();
}
};
Action<Exception> errorSource = ex =>
{
sourceDone = true;
exception = ex;
fetchIn.OnCompleted();
checkCleanup();
};
Action completedSource = () =>
{
sourceDone = true;
fetchIn.OnCompleted();
checkCleanup();
};
disposables.Add(source.ObserveOn(loop).Subscribe(sourceNext, errorSource, completedSource));
return results.ObserveOn(scheduler);
}
示例用法如下所示:
您将拥有要获取的索引的来源:
IObservable<X> source = ...
您将有一个可以从缓存中获取值的函数和一个可以放入它们的操作(两者都应该是线程安全的(:
Func<X, Y> getFromCache = x => ...;
Action<X, Y> addToCache = (x, y) => ...;
然后,您将有实际调用从数据库或服务中获取数据:
Func<X, Y> getFromService = x => ...;
然后你可以像这样定义fetch
:
Func<IObservable<X>, IObservable<Y>> fetch =
xs => xs.Select(x =>
{
var y = getFromService(x);
addToCache(x, y);
return y;
});
最后,您可以通过调用以下内容进行查询:
IObservable<Y> results =
source.FromCacheOrFetch(
getFromCache,
fetch,
Scheduler.ThreadPool);
当然,您需要订阅结果才能进行计算。
显然,Amb
不是正确的方法,因为这每次都会同时击中缓存和服务。如果缓存未命中,EntLib 会返回什么?
请注意,Observable.Timeout
是一个合理的选择:
cache(<paramters>).Timeout(TimeSpan.FromSeconds(1), service<paramters>);
但显然,如果您想处理来自 EntLib 的返回并采取适当的行动,那么超时不是一个好主意。
我不明白为什么这必然是一个反应式扩展问题。
一种简单的方法,可能不如@Enigmativity的解决方案功能齐全,可能是类似于以下内容:
public IObservable<T> GetCachedValue<TKey, TResult>(TKey key, Func<TKey, TResult> getFromCache, Func<TKey, TResult> getFromSource)
{
return getFromCache(<key>).Concat(getFromSource(<key>).Take(1);
}
这只是一个松散的想法,您需要添加:
- 一种将项添加到缓存或假定 getFromSource 缓存结果的机制
- 某种线程安全性,以防止对同一未缓存密钥的源进行多次命中(如果需要(
- getFromCache 需要返回 Observable.Empty(( 如果该项不在缓存中。
但是,如果您想要一些简单的东西,那么开始就不错了。