Rx.Net:在释放订阅时执行异步副作用

本文关键字:执行 异步 副作用 Net 释放 Rx | 更新日期: 2023-09-27 18:11:42

我有一个通过串行端口与计算机通信的设备。发送"START"命令后,设备会响应确认并开始监视某些外部活动。然后,它根据该外部活动在串行端口上异步传输一些消息。当设备收到"STOP"命令时,它会以确认作为响应,然后停止发送更多消息(表示外部活动(。

我已经使用冷可观察量实现了启动/停止命令,这些命令执行副作用(在串行端口上发送命令(,并在串行端口上收到 ackowledge 时发出单个Unit.Default。我想构造一个发出与外部活动对应的消息的IObservable,并在订阅时执行"START"副作用,在释放订阅时执行"STOP"副作用。"开始"很容易,我只需要做一个"SelectMany",但我不知道如何执行"停止"。

  class MonitoringDevice
{
    private SerialPort _sp;
    private IObservable<byte> _receivedBytes;
    public IObservable<ExternalActivity> ActivityStream { get; }
    public  MonitoringDevice()
    {
        _sp = new SerialPort("COM1");
        _receivedBytes = Observable
                         .FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
                          h =>
                          {
                              _sp.DiscardInBuffer();
                              _sp.DataReceived += h;
                          },
                          h =>
                          {
                              _sp.DataReceived -= h;
                          })
                         .SelectMany(x =>
                         {
                             byte[] buffer = new byte[1024];
                             var ret = new List<byte>();
                             int bytesRead = 0;
                             do
                             {
                                 bytesRead = _sp.Read(buffer, 0, buffer.Length);
                                 ret.AddRange(buffer.Take(bytesRead));
                             } while ((bytesRead >= buffer.Length));
                             return ret;
                         })
                         .Publish()
                         .RefCount();

        ActivityStream = StartMonitoringAsync()
                         .SelectMany( _receivedBytes.ToActivity());
                         // we need to execute StopMonitoringAsync 
                         // when a subscription to ActivityStream is disposed
        _sp.Open();
    }

    private IObservable<Unit> StartMonitoringAsync()
    {
        return Observable
               .Create<Unit>(
                obs =>
                {
                    _sp.Write("START");
                    return _receivedBytes
                           .ToAcknowledge()
                           .FirstAsync()
                           .Timeout(TimeSpan.FromMilliseconds(1000))
                           .Subscribe(obs);
                });
    }

    private IObservable<Unit> StopMonitoringAsync()
    {
        return Observable
               .Create<Unit>(
                obs =>
                {
                    _sp.Write("STOP");
                    return _receivedBytes
                           .ToAcknowledge()
                           .FirstAsync()
                           .Timeout(TimeSpan.FromMilliseconds(1000))
                           .Subscribe(obs);
                });
    }

}

ExternalActivity只是一个POCO。

ToAcknowledge 是一种扩展方法,返回在设备传输确认时发出Unit.DefaultIObservable。 - 这正在按预期工作;

ToActivity 是一种扩展方法,返回一个IObservable,用于分析传入的串行数据并发出ExternalActivity对象。 - 这正在按预期工作;


编辑:添加了ToAcknowledgeToActivity扩展方法的实现。

public static IObservable<Unit> ToAcknowledge(this IObservable<byte> source)
    {
        return source.Buffer(3, 1)
               .Where(bfr => bfr.SequenceEqual(new byte[] { 65, 67, 75 })) // ACK
               .Select(x => Unit.Default);
    }
    public static IObservable<ExternalActivity> ToActivity(this IObservable<byte> source)
    {
        return source
               .Publish(ps => ps.Buffer(ps.Where(x => x == 1),             // SOH
                                           bo => ps.Where(x => x == 4)))   // EOT
               .Select(bfr => bfr.Take(bfr.Count - 1).Skip(1))
               .Where(bfr => bfr.Count() == 12)
               .Select(bfr =>
               {
                   var timestamp = BitConverter.ToInt64(bfr.Take(8).ToArray(), 0);
                   var id = BitConverter.ToInt32(bfr.ToArray(), 8);
                   return new ExternalActivity(timestamp, id);
               });           
    }

Rx.Net:在释放订阅时执行异步副作用

您可以将StartAsync修改为:

private IObservable<Unit> StartAsync(Action unsubscribe)
{
    return
        Observable
            .Create<Unit>(o =>
            {
                var subscription =
                    Observable
                        .Timer(TimeSpan.FromSeconds(1))
                        .Select(_=> Unit.Default)
                        .Subscribe(o);
                return new CompositeDisposable(
                    subscription,
                    Disposable.Create(unsubscribe));
            });;
}

然后,您可以注入任何您喜欢的Action unsubscribe

尝试使用以下代码进行测试:

var subscription =
    StartAsync(() => Console.WriteLine("Done"))
    .Subscribe();
Thread.Sleep(3000);
subscription.Dispose();

您将在 3 秒后看到"完成"写入主机。