发布来自流的解码消息,同时保持Rx中的封装

本文关键字:Rx 封装 消息 解码 | 更新日期: 2023-09-27 18:04:22

我是使用Rx的新手,我一直在尝试重写我的MVC/服务层(不是ASP!)来使用这个令人敬畏的新奇的Rx。我有一个类叫做Remote,它封装了一个NetworkStreamRemote使用Rx来侦听来自NetworkStream的字节,一旦它计算出它收到了完整的消息值数据,它将该数据解码为IMessage

我知道我如何从Remote内部连续使用Rx从Stream读取,但是我如何从Remote将解码的IMessage从该流发布到外部世界?我应该在c#中使用经典的事件风格,并让事件的消费者使用Observable.FromEvent吗?

我只是问,因为我已经读到,IObservable是不打算再实施了

发布来自流的解码消息,同时保持Rx中的封装

我应该在c#中使用经典的事件样式并拥有事件的消费者使用Observable.FromEvent ?

如果你没有被强制这样做,不要使用c#风格的事件来创建API IObservable<T>是一个强大的、通用的、得到广泛支持的接口,它允许我们在轻松管理订阅的同时将事件视为一级公民。即使你的用户不使用Rx,他们也能够比使用c#事件更容易地理解和使用IObservable<T>。他们如何处理这些事件取决于他们自己,但IObservable<T>的抽象更清晰、更简单。

我听说IObservable不再被实现了。

实际上,我们的意思是可能没有理由自己实现IObservable<T>,因为我们有工具为我们创建该类型的实例。

我们有Observable.Create(...),它允许我们从头开始创建可观察对象。我们有不同类型的主题,如Subject<T>, BehaviorSubject<T>, ReplaySubject<T>等,它们可以用作代理,并允许我们向多个消费者多播值,我们有操作符,允许我们将任何IObservable<T>转换/组合成另一种类型或类型的IObservable<T>

但是我如何从该流发布解码IMessage到外部世界从Remote ?

在你的类/接口上公开一个IObservable<T>

public interface IRemote
{
  public IObservable<IMessage> Messages { get; }
}

你可以用很多方法来实现它。首先,您可以这样做,以便每个对Messages的订阅都获得它自己对底层逻辑的订阅…

public class Remote : IRemote
{
  private IObservable<IMessage> _messages = ...;
  public IObservable<IMessage> Message {
    get {
      return message;
    }
  }
}

或者你可以确保对底层逻辑只有一个订阅…

public class Remote : IRemote
{
  private IObservable<IMessage> _messages = ...;
  private IObservable<IMessage> _refCountedMessages
    = this._messages
        .Publish()
        .RefCount();
  public IObservable<IMessage> Message {
    get {
      return this._refCountedMessages;
    }
  }
}

或者您可以使连接过程在本质上非常显式。

public interface IRemote
{
  public IObservable<IMessage> Messages { get; }
  public IDisposable Connect();
}
public class Remote : IRemote
{
  private IObservable<IMessage> _messages = ...;
  private IObservable<IMessage> _connectableMessages
    = this._messages
        .Publish();
  public IObservable<IMessage> Message {
    get {
      return this._connectableMessages;
    }
  }
  public IDisposable Connect()
  {
    return this._connectableMessages.Connect();
  }
}

我想你的问题类似于这个问题如何"重建线条"。使用Rx从串行端口读取的数据?

您将获得字节,而不是将字符串推送给您,然后将其更改为消息。没问题,您可以使用相同的WindowBy概念将字节序列分割成窗口,然后可以将其翻译/转换/映射/任何内容转换为IMessage

基于Christopher Harris的回答。下面是他提出的接口的实现。这里的要点是,您可以公开可观察序列,这些序列只是建立在底层可观察序列之上的查询。在这种情况下,消息序列只是网络序列上的查询。通过分层,我们得到了消费者想要的抽象层次。

void Main()
{
    var networkStream = new NetworkStream();
    var remote = new Remote(networkStream);
    remote.GetMessages().Dump("remote.GetMessages()");
}
// Define other methods and classes here
public class NetworkStream
{
    //Fake getting bytes off the wire or disk
    public IObservable<byte> GetNetworkStream()
    {
        var text = @"Line 1.
Hello line 2.
3rd and final line!";
        return Observable.Zip(
            UTF8Encoding.UTF8.GetBytes(text).ToObservable(),
            Observable.Interval(TimeSpan.FromMilliseconds(100)),
            (character, time)=>character);
    }
}
public interface IMessage
{
    string Content {get;}
}
public class Message : IMessage
{
    public Message(string content)
    {
        Content = content;
    }
    public string Content {get; private set;}
}
public interface IRemote
{
    IObservable<IMessage> GetMessages();
}
public class Remote : IRemote
{
    private readonly NetworkStream _networkStream;
    private readonly byte[] _delimiter = UTF8Encoding.UTF8.GetBytes(Environment.NewLine);
    public Remote(NetworkStream networkStream)
    {
        _networkStream = networkStream;
    }
    public IObservable<IMessage> GetMessages()
    {
    return  _networkStream.GetNetworkStream()
                            .WindowByExclusive(b => _delimiter.Contains(b))
                            .SelectMany(window=>window.ToArray().Select(bytes=>UTF8Encoding.UTF8.GetString(bytes)))
                            .Select(content=>new Message(content));
    }
    //TODO Add IDispose and clean up your NetworkStream
}
public static class ObservableEx
{
    public static IObservable<IObservable<T>> WindowByExclusive<T>(this IObservable<T> input, Func<T, bool> isWindowBoundary)
    {
        return Observable.Create<IObservable<T>>(o=>
        {
            var source = input.Publish().RefCount();
            var left = source.Where(isWindowBoundary).Select(_=>Unit.Default).StartWith(Unit.Default);
            return left.GroupJoin(
                            source.Where(c=>!isWindowBoundary(c)),
                            x=>source.Where(isWindowBoundary),
                            x=>Observable.Empty<Unit>(),
                            (_,window)=>window)
                        .Subscribe(o);
        });
    }
}