可观察的 TcpListener 在单个连接后终止

本文关键字:连接 终止 单个 观察 TcpListener | 更新日期: 2023-09-27 18:31:44

我是Rx的新手,所以我可能在这里犯了一些重要的错误。

我想创建一个非常简单的套接字服务器,可以使用可观察量接收来自客户端的消息。为此,我使用的是Rxx,它在System.Net.Sockets命名空间中提供了扩展方法,还提供了ObserableTcpListener静态工厂类。

这是我到目前为止所拥有的,几乎是从各种来源偷来的:

IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);
TcpListener listener = new TcpListener(endpoint);
IObservable<TcpClient> clients = listener
    .StartSocketObservable(1)
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket));
    .Finally(listener.Stop)
clients.Subscribe(client =>
{
    OnConnect(client).Subscribe(
        message => OnMessage(client, message),
        ex => OnException(client, ex),
        () => OnCompleted(client));
});
private static IObservable<TcpClient> SocketToTcpClient(Socket socket)
{
    TcpClient client = new TcpClient();
    client.Client = socket;
    return Observable.Return<TcpClient>(client);
}
private static IObservable<byte[]> OnConnect(TcpClient client)
{
    return client.Client.ReceiveUntilCompleted(SocketFlags.None);
}
private static void OnMessage(TcpClient client, byte[] message)
{
    Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message));
}
private static void OnCompleted(TcpClient client)
{
    Console.WriteLine("Completed.");
}
private static void OnException(TcpClient client, Exception ex)
{
    Console.WriteLine("Exception: {0}", ex.ToString());
}

这行得通...在一定程度上。我可以建立单个客户端连接。一旦该连接终止,似乎可观察序列就会终止并调用.Finally(listener.Stop)。显然,这不是我想要的。

我尝试使用ObserableTcpListener.Start()工厂类,但这给我带来了完全相同的结果。

IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint);
sockets.Subscribe(client =>
{
    OnConnect(client).Subscribe(
        message => OnMessage(client, message),
        ex => OnException(client, ex),
        () => OnCompleted(client));
});

我想我确实理解这里的问题:在第一个客户端终止后,clients可观察序列只是空的,因此调用了.Finally(listener.Stop)

我需要做些什么来规避这种情况?如何继续侦听传入连接?

可观察的 TcpListener 在单个连接后终止

让你的Observable起来,并在有订阅时坚持下去。

IObservable<TcpClient> clients = listener
    .StartSocketObservable(1)
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket))
    .Finally(listener.Stop)
    .Publish().RefCount();