使用RX创建IO绑定的可观察对象

本文关键字:观察 对象 绑定 RX 创建 IO 使用 | 更新日期: 2023-09-27 18:00:53

我有一个工作线程正在进行阻塞调用(ReadFrame(以从套接字读取传入数据(IO绑定(。线程运行将数据馈送到CCD_,消费者可以观察到。

private void ReadLoop()
{
    while (!IsDisposed)
    {
        var frame = _Socket.ReadFrame();
        _ReceivedFrames.OnNext(frame);
    }
}

我想知道是否有一种更RX的方式可以做到这一点。

以下是我的一个尝试(玩具示例(:

var src = Observable
         .Repeat(Unit.Default)
         .Select(_ =>
         {
             Thread.Sleep(1000);              // simulated blocking ReadFrame call
             return "data read from socket";
         })
         .SubscribeOn(ThreadPoolScheduler.Instance) // avoid blocking when subscribing
         .ObserveOn(NewThreadScheduler.Default)     // spin up new thread (?)
         .Publish()
         .RefCount();
var d = src.Subscribe(s => s.Dump()); // simulated consumer
Console.ReadLine();  // simulated main running code
d.Dispose();  // tear down

我正在努力正确使用ObserveOnSubscribeOn和调度器。这个玩具示例似乎有效,但我不确定线程的生存期是否得到了正确的管理。

读卡器线程是否已通过d.Dispose()调用关闭
我需要创建一个新线程吗
我应该使用Observable.Create吗?怎样

以下是@Enigmatility:要求的额外信息

ReadLoop()方法是符合以下接口的类的一部分:

public interface ICanSocket : IDisposable
{
    IObservable<CanFrame> ReceivedFrames { get; }
    IObserver<CanFrame>   FramesToSend   { get; }
}

其成员_Socket在父ICanSocket被布置时被布置(闭合(。

使用RX创建IO绑定的可观察对象

实现这一点的最"Rxy"的方法是使用Rxx,它有可观察的异步I/O方法。

看来你主要关心的是:

  • 订阅时,不要阻塞订阅线程(也就是在后台线程上运行I/O线程(
  • 当调用方取消订阅时,停止I/O线程

解决这些问题的一种方法是只使用异步创建方法:

// just use Task.Run to "background" the work
var src = Observable
    .Create<CanFrame>((observer, cancellationToken) => Task.Run(() =>
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var frame = _Socket.ReadFrame();
            if (frame == null) // end of stream?
            {
                // will send a Completed event
                return;
            }
            observer.OnNext(frame);
        }
    }));
var d = src.Subscribe(s => s.Dump());
Console.ReadLine();
d.Dispose();