使用 Rx 轮询网站
本文关键字:网站 Rx 使用 | 更新日期: 2023-09-27 18:35:06
只是想让我的头脑了解Rx
我使用 Rx 每 2 秒轮询一次网站
var results = new List<MyDTO>();
var cx = new WebserviceAPI( ... );
var callback = cx.GetDataAsync().Subscribe(rs => { results.AddRange(rs); });
var poller = Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe( _ => { cx.StartGetDataAsync(); });
(Web 服务 API 公开了一个 getItemsAsync/getItemsCompleted 事件处理程序类型机制,我正在从中创建可观察量)。
当网站返回时,我将响应的"业务部分"解压缩为IE无数的DTO
。public IObservable<IEnumerable<MyDTO>> GetDataAsync()
{
var o = Observable.FromEventPattern<getItemsCompletedEventHandler,getItemsCompletedEventArgs>(
h => _webService.getItemsCompleted += h,
h => _webService.getItemsCompleted -= h);
return o.Select(c=> from itm in c.EventArgs.Result.ItemList
select new MyDTO()
{
...
});
}
我的理由是,鉴于所有数据都只是在字符串中,将其打包到IEnumerable中是有意义的......但现在我不确定这是否正确!
如果网站响应时间超过 2 秒,我发现 MSTest 崩溃了。调试时,生成的错误是
"异步处理期间出错。唯一状态 对象是多个异步同时操作所必需的 要出类拔萃"
除了内部的例外
"项目已添加。字典中的键:"系统.对象"键 被添加:"系统.对象"
我假设问题是重入问题之一,因为下一个调用在上一个调用完成填充数据之前开始并返回数据。
所以我不确定是否
- 我已经把事情放在一起很正确
- 我应该以某种方式限制连接以避免重新进入。
- 我应该使用不同的中间数据结构(或机制)而不是 IEnumerable
我将不胜感激一些指导。
编辑 1:所以我更改了 Web 调用以包含一个唯一的状态对象
public void StartGetDataAsync()
{
...
// was: _webService.getItemsAsync(request);
_webService.getItemsAsync(request, Guid.NewGuid());
}
并使其工作。但我仍然不确定这是否是正确的方法
编辑 2 - Web 服务 sigs:我正在使用webServiceApi类包装的soap Web服务。创建的引用.cs包含以下方法
public void getItemsAsync(GetItemsReq request, object userState)
{
if ((this.getItemsOperationCompleted == null))
{
this.getItemsOperationCompleted = new System.Threading.SendOrPostCallback(this.OngetItemsOperationCompleted);
}
this.InvokeAsync("getItems", new object[] {
request}, this.getItemsOperationCompleted, userState);
}
private System.Threading.SendOrPostCallback getItemsOperationCompleted;
public event getItemsCompletedEventHandler getItemsCompleted;
public delegate void getItemsCompletedEventHandler(object sender, getItemsCompletedEventArgs e);
public partial class getItemsCompletedEventArgs : System.ComponentModel.AsyncCompletedEventArgs
{
...
}
private void OngetItemsOperationCompleted(object arg)
{
if ((this.getItemsCompleted != null))
{
System.Web.Services.Protocols.InvokeCompletedEventArgs invokeArgs = ((System.Web.Services.Protocols.InvokeCompletedEventArgs)(arg));
this.getItemsCompleted(this, new getItemsCompletedEventArgs(invokeArgs.Results, invokeArgs.Error, invokeArgs.Cancelled, invokeArgs.UserState));
}
}
可能给了你太多(或错过了什么)!
感谢
我为你提供了一个不错的起点。
基本上,我认为您需要抽象出Web服务的复杂性,并创建一个漂亮的干净函数来获得结果。
尝试这样的事情:
Func<GetItemsReq, IObservable<getItemsCompletedEventArgs>> fetch =
rq =>
Observable.Create<getItemsCompletedEventArgs>(o =>
{
var cx = new WebserviceAPI(/* ... */);
var state = new object();
var res =
Observable
.FromEventPattern<
getItemsCompletedEventHandler,
getItemsCompletedEventArgs>(
h => cx.getItemsCompleted += h,
h => cx.getItemsCompleted -= h)
.Where(x => x.EventArgs.UserState == state)
.Take(1)
.Select(x => x.EventArgs);
var subscription = res.Subscribe(o);
cx.getItemsAsync(rq, state);
return subscription;
});
就个人而言,我会更进一步,定义一个返回类型,比如GetItemsReq
,它不包括用户状态对象,但基本上与getItemsCompletedEventArgs
相同。
然后,您应该能够使用 Observable.Interval
来创建所需的轮询。
实现了IDisposable
则您应该在上述函数中添加一个Observable.Using
调用,以便在 Web 服务完成时正确释放它。
让我知道这是否有帮助。
谜有一个很好的解决方案。
var state = new object(); cx.getItemsAsync(rq, state);
这些陈述是解决错误的关键。
每次调用 Async 方法时,它都需要传递一个唯一的对象(当然,除非一次只有一个 Async 方法运行,这是极不可能的)。我为此挠了几个小时,直到我发现你可以传递一个对象参数。如果没有该参数,它将看到多个方法作为同一调用,并为您提供"项目已添加。字典中的键:"系统.对象" 正在添加的键:"系统.对象"消息。