如何检测NetworkStream何时完成Rx查询

本文关键字:何时完 Rx 查询 NetworkStream 何检测 检测 | 更新日期: 2023-09-27 17:49:34

我使用Rx从NetworkStream中读取,并将结果提供为Hot Observable。即使查询工作得很好,我也不确定基于NetworkStream完成序列的条件是否最合适。我遇到过这样的情况:序列完成了,而另一端的TcpListener还没有完成或关闭连接。

这是查询。如果您能提供一些关于安全终止序列的正确条件的建议,我将不胜感激:

private IDisposable GetStreamSubscription(TcpClient client)
{
    return Observable.Defer(() => {
      var buffer = new byte[client.ReceiveBufferSize];
      return Observable.FromAsync<int>(() => {
         return client.GetStream ().ReadAsync (buffer, 0, buffer.Length);
      })
      .SubscribeOn(NewThreadScheduler.Default)
      .Select(x => buffer.Take(x).ToArray());
    })
    .Repeat()
    .TakeWhile(bytes => bytes.Any()) //This is the condition to review
    .Subscribe(bytes => {
        //OnNext Logic
    }, ex => {
        //OnError logic
    }, () => {
        //OnCompleted Logic
    });
}

只是为了清楚我的问题,我需要知道检测网络流何时在另一边完成的最佳方法(因为断开连接,错误或其他)。现在我通过调用ReadAsync,直到没有字节返回,但我不知道这是完全安全的。

如何检测NetworkStream何时完成Rx查询

这是你想要的吗?

private IDisposable GetStreamSubscription(TcpClient client)
{
    return Observable
        .Defer(() =>
        {
            var buffer = new byte[client.ReceiveBufferSize];
            return Observable.Using(
                () => client.GetStream(),
                st => Observable.While(
                    () => st.DataAvailable,
                    Observable.Start(() =>
                    {
                        var bytes = st.Read(buffer, 0, buffer.Length);
                        return buffer.Take(bytes).ToArray();
                    })));
        })
        .SubscribeOn(NewThreadScheduler.Default)
        .Subscribe(bytes => {
            //OnNext Logic
        }, ex => {
            //OnError logic
        }, () => {
            //OnCompleted Logic
        });
}

请注意,这将安全地处置流,并在不再有任何可用数据时结束。