在SignalR集线器中创建Rx.NET Observable

本文关键字:Rx NET Observable 创建 SignalR 集线器 | 更新日期: 2023-09-27 17:58:35

我有一个SignalR集线器,它监听客户端请求,并使用Rx.NET观察数据库表,以便在更新可用时将更新发回请求更新的客户端。但是,根据客户端请求,在集线器中创建的Observer实例似乎在方法调用完成后立即被销毁(由GC?);因此,我没有得到任何更新。

这是我当前的Hub实现:

public class BookHub : Hub {
    private readonly BookService _service = new BookService();
    public void RequestBookUpdate(string author) {
        BookObserver observer = new BookObserver(Context.connectionId);
        IDisposable unsubscriber = _service.RequestBookUpdate(author, observer);
    }
}

BookService返回转换为Observable:的LINQ查询

public IDisposable RequestBookUpdate(string author, BookObserver observer) {
    var query = from b in db.Book where b.Author.Contains(author) select b;
    IObservable<Book> observable = query.ToObservable();
    IDisposable unsubscriber = observable.Subscribe(observer);
    return unsubscriber;
}

BookObserver只是将新检索到的项目发送回请求更新的特定客户端(由connectionId标识):

// omissis
private static readonly IHubContext _context = GlobalHost.ConnectionManager.GetHubContext<BookHub>();
private readonly string _connectionId;
public BookObserver(string connectionId) {
    connectionId = _connectionId:
}
public void OnNext(Book value) {
    _context.Clients.Client(_connectionId).foundNewBook(value);
}

我不关心BookService实例被破坏,但我希望BookObserver保持活动状态,因此只有当客户端断开连接时,我才能调用unsubscriber.Dispose()。这可能吗?

在SignalR集线器中创建Rx.NET Observable

Observer接收到对OnComplete的调用时,它将被自动释放。这实际上是一个非常好的模式,因为这意味着你不必像这样手动处理Subscription

Observable.Range(0, 100)
    .Subscribe(...);

Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(10)
    .Subscribe();

因此,为了确保你的观察者在想要被处理之前不会被处理,你可以插入另一个对你的源来说是空的、永远不完整的、可观察的。

IObservable<Book> observable = query.ToObservable()
    .Concat(Observable.Never<Book>());

然而,根据你试图做的事情,最好在其他地方处理,比如客户。