使用 Rx 轮询网站

本文关键字:网站 Rx 使用 | 更新日期: 2023-09-27 18:35:06

只是想让我的头脑了解Rx

我使用 Rx 每 2 秒轮询一次网站

var results = new List<MyDTO>();
var cx = new WebserviceAPI( ... );
var callback = cx.GetDataAsync().Subscribe(rs => { results.AddRange(rs); });
var poller = Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe( _ => { cx.StartGetDataAsync(); });

(Web 服务 API 公开了一个 getItemsAsync/getItemsCompleted 事件处理程序类型机制,我正在从中创建可观察量)。

当网站返回时,我将响应的"业务部分"解压缩为IE无数的DTO

public IObservable<IEnumerable<MyDTO>> GetDataAsync()
{
    var o = Observable.FromEventPattern<getItemsCompletedEventHandler,getItemsCompletedEventArgs>(
        h => _webService.getItemsCompleted += h,
        h => _webService.getItemsCompleted -= h);
    return o.Select(c=> from itm in c.EventArgs.Result.ItemList
                        select new MyDTO()
                        {
                           ...
                        });
}

我的理由是,鉴于所有数据都只是在字符串中,将其打包到IEnumerable中是有意义的......但现在我不确定这是否正确!

如果网站响应时间超过 2 秒,我发现 MSTest 崩溃了。调试时,生成的错误是

"异步处理期间出错。唯一状态 对象是多个异步同时操作所必需的 要出类拔萃"

除了内部的例外

"项目已添加。字典中的键:"系统.对象"键 被添加:"系统.对象"

我假设问题是重入问题之一,因为下一个调用在上一个调用完成填充数据之前开始并返回数据。

所以我不确定是否

  1. 我已经把事情放在一起很正确
  2. 我应该以某种方式限制连接以避免重新进入。
  3. 我应该使用不同的中间数据结构(或机制)而不是 IEnumerable

我将不胜感激一些指导。

编辑 1:所以我更改了 Web 调用以包含一个唯一的状态对象

public void StartGetDataAsync()
{
    ...
    //  was: _webService.getItemsAsync(request);
    _webService.getItemsAsync(request, Guid.NewGuid());
}

并使其工作。但我仍然不确定这是否是正确的方法

编辑 2 - Web 服务 sigs:我正在使用webServiceApi类包装的soap Web服务。创建的引用.cs包含以下方法

public void getItemsAsync(GetItemsReq request, object userState) 
{
    if ((this.getItemsOperationCompleted == null)) 
    {
        this.getItemsOperationCompleted = new System.Threading.SendOrPostCallback(this.OngetItemsOperationCompleted);
    }
    this.InvokeAsync("getItems", new object[] {
                    request}, this.getItemsOperationCompleted, userState);
}
private System.Threading.SendOrPostCallback getItemsOperationCompleted;
public event getItemsCompletedEventHandler getItemsCompleted;
public delegate void getItemsCompletedEventHandler(object sender, getItemsCompletedEventArgs e);
public partial class getItemsCompletedEventArgs : System.ComponentModel.AsyncCompletedEventArgs 
{
    ...
}
private void OngetItemsOperationCompleted(object arg) 
{
    if ((this.getItemsCompleted != null)) 
    {
        System.Web.Services.Protocols.InvokeCompletedEventArgs invokeArgs = ((System.Web.Services.Protocols.InvokeCompletedEventArgs)(arg));
        this.getItemsCompleted(this, new getItemsCompletedEventArgs(invokeArgs.Results, invokeArgs.Error, invokeArgs.Cancelled, invokeArgs.UserState));
    }
 }

可能给了你太多(或错过了什么)!

感谢

使用 Rx 轮询网站

我想

我为你提供了一个不错的起点。

基本上,我认为您需要抽象出Web服务的复杂性,并创建一个漂亮的干净函数来获得结果。

尝试这样的事情:

Func<GetItemsReq, IObservable<getItemsCompletedEventArgs>> fetch =
    rq =>
        Observable.Create<getItemsCompletedEventArgs>(o =>
        {
            var cx = new WebserviceAPI(/* ... */);
            var state = new object();
            var res =
                Observable
                    .FromEventPattern<
                        getItemsCompletedEventHandler,
                        getItemsCompletedEventArgs>(
                        h => cx.getItemsCompleted += h,
                        h => cx.getItemsCompleted -= h)
                    .Where(x => x.EventArgs.UserState == state)
                    .Take(1)
                    .Select(x => x.EventArgs);
            var subscription = res.Subscribe(o);
            cx.getItemsAsync(rq, state);
            return subscription;
        });

就个人而言,我会更进一步,定义一个返回类型,比如GetItemsReq,它不包括用户状态对象,但基本上与getItemsCompletedEventArgs相同。

然后,您应该能够使用 Observable.Interval 来创建所需的轮询。

如果您的 Web 服务

实现了IDisposable则您应该在上述函数中添加一个Observable.Using调用,以便在 Web 服务完成时正确释放它。

让我知道这是否有帮助。

谜有一个很好的解决方案。

          var state = new object();
          cx.getItemsAsync(rq, state);

这些陈述是解决错误的关键。

每次调用 Async 方法时,它都需要传递一个唯一的对象(当然,除非一次只有一个 Async 方法运行,这是极不可能的)。我为此挠了几个小时,直到我发现你可以传递一个对象参数。如果没有该参数,它将看到多个方法作为同一调用,并为您提供"项目已添加。字典中的键:"系统.对象" 正在添加的键:"系统.对象"消息。