如何检测NetworkStream何时完成Rx查询
本文关键字:何时完 Rx 查询 NetworkStream 何检测 检测 | 更新日期: 2023-09-27 17:49:34
我使用Rx从NetworkStream中读取,并将结果提供为Hot Observable。即使查询工作得很好,我也不确定基于NetworkStream完成序列的条件是否最合适。我遇到过这样的情况:序列完成了,而另一端的TcpListener还没有完成或关闭连接。
这是查询。如果您能提供一些关于安全终止序列的正确条件的建议,我将不胜感激:
private IDisposable GetStreamSubscription(TcpClient client)
{
return Observable.Defer(() => {
var buffer = new byte[client.ReceiveBufferSize];
return Observable.FromAsync<int>(() => {
return client.GetStream ().ReadAsync (buffer, 0, buffer.Length);
})
.SubscribeOn(NewThreadScheduler.Default)
.Select(x => buffer.Take(x).ToArray());
})
.Repeat()
.TakeWhile(bytes => bytes.Any()) //This is the condition to review
.Subscribe(bytes => {
//OnNext Logic
}, ex => {
//OnError logic
}, () => {
//OnCompleted Logic
});
}
只是为了清楚我的问题,我需要知道检测网络流何时在另一边完成的最佳方法(因为断开连接,错误或其他)。现在我通过调用ReadAsync,直到没有字节返回,但我不知道这是完全安全的。
这是你想要的吗?
private IDisposable GetStreamSubscription(TcpClient client)
{
return Observable
.Defer(() =>
{
var buffer = new byte[client.ReceiveBufferSize];
return Observable.Using(
() => client.GetStream(),
st => Observable.While(
() => st.DataAvailable,
Observable.Start(() =>
{
var bytes = st.Read(buffer, 0, buffer.Length);
return buffer.Take(bytes).ToArray();
})));
})
.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(bytes => {
//OnNext Logic
}, ex => {
//OnError logic
}, () => {
//OnCompleted Logic
});
}
请注意,这将安全地处置流,并在不再有任何可用数据时结束。