我怎样才能让这个Observable更容易被重用呢?
本文关键字:更容易 Observable | 更新日期: 2023-09-27 18:12:20
这是一个从web服务中检索分页数据的可观察序列。每个web服务响应都包含一个nextRecordsUrl
,它指示从哪里获得下一组记录。把这个Observable转换成更可重用的东西的最好方法是什么?
Web服务设置:
var auth = new AuthenticationClient ();
await auth.UsernamePasswordAsync (consumerKey, consumerSecret, userName, password + passwordSecurityToken);
var forceClient = new ForceClient (auth.InstanceUrl, auth.AccessToken, auth.ApiVersion);
可见:
var observable = Observable.Create<QueryResult<Account>> (async (IObserver<QueryResult<Account>> o) =>
{
try
{
var queryResult = await forceClient.QueryAsync<Account> ("SELECT Id, Name from Account");
if (queryResult != null)
{
o.OnNext (queryResult);
while (!string.IsNullOrEmpty (queryResult.nextRecordsUrl))
{
queryResult = await forceClient.QueryContinuationAsync<Account> (queryResult.nextRecordsUrl);
if (queryResult != null)
{
o.OnNext (queryResult);
}
}
}
o.OnCompleted ();
}
catch (Exception ex)
{
o.OnError (ex);
}
return () => {};
});
订阅可观察对象并收集结果:
var accounts = new List<Account> ();
observable.Subscribe (
observer => accounts.AddRange (observer.records),
ex => Console.WriteLine (ex.Message),
() => {});
编辑:使用Brandon的解决方案,我现在可以生成结果列表与聚合
List<Account> accounts = await forceClient.QueryPages<Account> ("SELECT Id, Name from Account")
.Aggregate (new List<Account> (), (list, value) =>
{
list.AddRange (value.records);
return list;
});
信不信由你,rx - experimental库(也由MS维护)有一个叫做Expand
的算子。Expand用于从可观察对象中获取每个元素,并通过生成另一个相同类型的可观察对象的函数运行它。然后,该可观察对象被平展为原始对象,其中的每个项都经过相同的过程。
假设给定一个具有子节点的可观察树节点。您可以使用expand来轻松地遍历这棵树。由于链表只是树的一个约束版本,并且由于你所拥有的实际上是一个链表,其中每个节点都是一个可观察对象,所以你可以使用expand。
public static IObservable<QueryResult<TResult>> QueryPages<TResult>(this ForceClient forceClient, string query)
{
return Observable.FromAsync(() => forceClient.QueryAsync<TResult>(query))
.Where(QueryResultIsValid)
.Expand(result =>
Observable.FromAsync(() => forceClient.QueryContinuationAsync<TResult>(queryResult.nextRecordsUrl))
.Where(QueryResultIsValid)
);
}
public static bool QueryResultIsValid(QueryResult<TResult> result)
{
return result != null;
}
您正在寻找这样的东西吗?
public static IObservable<QueryResult<TResult>> QueryPages<TResult>(this ForceClient forceClient, string query)
{
return Observable.Create<QueryResult<T>> (async (observer, token) =>
{
// No need for try/catch. Create() will call OnError if your task fails.
// Also no need for OnCompleted(). Create() calls it when your task completes
var queryResult = await forceClient.QueryAsync<TResult> (query);
while (queryResult != null)
{
observer.OnNext (queryResult);
// check the token *after* we call OnNext
// because if an observer unsubscribes
// it typically occurs during the notification
// e.g. they are using .Take(..) or
// something.
if (string.IsNullOrEmpty(queryResult.nextRecordsUrl) ||
token.IsCancellationRequested)
{
break;
}
queryResult = await forceClient.QueryContinuationAsync<TResult> (queryResult.nextRecordsUrl);
}
// No need to return anything. Just the Task itself is all that Create() wants.
}
});
// Usage:
var forceClient = // ...
var foos = forceClient.QueryPages<Foo>("SELECT A, B, C FROM Foo");
注意,我把它切换到重载,它提供了一个取消令牌,这样如果观察者取消订阅,你可以停止获取页面(你的原始版本将继续获取页面,即使观察者已经停止侦听)。还请注意,异步Create
等待您的Task
并为您调用OnError
或OnCompleted
,因此您无需担心大多数时间。