网络流写阻塞

本文关键字:网络 | 更新日期: 2023-09-27 17:51:21

我正在开发一个c#应用程序(.net 4),它可以接受来自不同客户端的多个tcp连接。只有一个接受套接字的tcp侦听器。b/w节点双工通信。数据使用Networkstream发送。写入方法和读取使用Networkstream。阅读方法。对于每个tcp连接,将创建一个单独的线程。

问题是,几天前我们注意到其中一个客户端停止读取数据(由于错误)20分钟。由于连接没有断开,因此服务器上没有(IO)异常。但是,我们注意到其他客户机上的数据也没有移动。20分钟后,该客户端再次开始接收数据,很快其他客户端也开始接收数据。

我知道网络流的写方法是一个阻塞方法,我们没有使用任何超时。因此有可能出现写阻塞(此处有描述)。但正如我所理解的,每个tcp连接必须有一个单独的写缓冲区,或者有更多的东西在起作用。发送阻塞在一个tcp连接,可以影响其他tcp连接在同一应用程序?

这是写操作的伪代码。对于每个连接,都有一个由单独线程执行的单独的传出队列进程。

public class TCPServerListener : baseConnection
{
    private readonly int _Port;
    private TcpListener _tcpListener;
    private Thread _thread;
    private List<TcpClientData> _tcpClientDataList = new List<TcpClientData>();
    private long _messageDiscardTimeout;
    private bool LoopForClientConnection = true;
    public TCPServerListener(int port, ThreadPriority threadPriority)
    {
        try
        {
            // init property
        }
        catch (Exception ex)
        {
            // log
        }
    }
    public void SendMessageToAll(int type)
    {
        base.EnqueueMessageToSend(type, _tcpClientDataList);
    }
    public void SendMessageToList(int type, IList<TcpClient> tcpClientList)
    {
        base.EnqueueMessageToSend(type, tcpClientList);
    }
    public void SendMessage(int type, TcpClient tcpClient)
    {
        base.EnqueueMessageToSend(type, tcpClient);
    }

    private void AcceptClientConnections()
    {
        while (LoopForClientConnection)
        {
            try
            {
                Socket socket = _tcpListener.AcceptSocket();
                TcpClientData tcpClientData = new TcpClientData();
                tcpClientData.tcpClientThread = new Thread(new ParameterizedThreadStart(StartAsync));
                tcpClientData.tcpClientThread.Priority = _threadPriority;
                tcpClientData.tcpClientThread.IsBackground = true;
                tcpClientData.tcpClientThread.Name = "CD" + tcpClientData.tcpClientThread.ManagedThreadId;
                tcpClientData.tcpClient = new TcpClient();
                tcpClientData.tcpClient.Client = socket;
                _tcpClientDataList.Add(tcpClientData);
                tcpClientData.tcpClientThread.Start(tcpClientData.tcpClient);
            }
            catch (ThreadAbortException ex)
            {
                //log
            }
            catch (Exception ex)
            {
                //log
            }
        }
    }


    public override void Start()
    {
        base.Start();
        _tcpListener = new TcpListener(System.Net.IPAddress.Any, _Port);
        _thread = new Thread(AcceptClientConnections);
        _thread.Priority = _threadPriority;
        _thread.IsBackground = true;
        _tcpListener.Start();
        _thread.Start();
    }
    public override void Stop()
    {
       // stop listener and terminate threads
    }
}

public class baseConnection
{
    private Thread _InCommingThread;
    private Thread _OutGoingThread;
    protected ThreadPriority _threadPriority;
    protected BlockingCollection<MessageReceived> _InComingMessageQueue = new BlockingCollection<MessageReceived>();
    protected BlockingCollection<MessageToSend> _OutgoingMessageQueue = new BlockingCollection<MessageToSend>();
    public void StartAsync(Object oTcpClient)
    {
        TcpClient tcpClient = oTcpClient as TcpClient;
        if (tcpClient == null)
            return;
        using (tcpClient)
        {
            using (NetworkStream stream = tcpClient.GetStream())
            {
                stream.ReadTimeout = Timeout.Infinite;
                stream.WriteTimeout = Timeout.Infinite;
                BinaryReader bodyReader = new BinaryReader(stream);
                while (tcpClient.Connected)
                {
                    try
                    {
                        int messageType = bodyReader.ReadInt32();
                        // checks to verify messages 
                        // enqueue message in incoming queue
                        _InComingMessageQueue.Add(new MessageReceived(messageType, tcpClient));
                    }
                    catch (EndOfStreamException ex)
                    {
                        // log
                        break;
                    }
                    catch (Exception ex)
                    {
                        // log
                        Thread.Sleep(100);
                    }
                }
                //RaiseDisconnected(tcpClient);
            }
        }
    }

    public virtual void Start()
    {
        _InCommingThread = new Thread(HandleInCommingMessnge);
        _InCommingThread.Priority = _threadPriority;
        _InCommingThread.IsBackground = true;
        _InCommingThread.Start();
        _OutGoingThread = new Thread(HandleOutgoingQueue);
        _OutGoingThread.Priority = _threadPriority;
        _OutGoingThread.IsBackground = true;
        _OutGoingThread.Start();
    }

    public virtual void Stop()
    {
       // stop the threads and free up resources
    }
    protected void EnqueueMessageToSend(int type, List<TcpClientData> tcpClientDataList)
    {
        tcpClientDataList.ForEach(x => _OutgoingMessageQueue.Add(new MessageToSend(type, x.tcpClient)));
    }
    protected void EnqueueMessageToSend(int type, IList<TcpClient> tcpClientList)
    {
        foreach (TcpClient tcpClient in tcpClientList)
        {
            _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
        }
    }
    protected void EnqueueMessageToSend(int type, TcpClient tcpClient)
    {
        _OutgoingMessageQueue.Add(new MessageToSend(type, tcpClient));
    }

    private void HandleOutgoingQueue()
    {
        while (true)
        {
            try
            {
                MessageToSend message = _OutgoingMessageQueue.Take();
                if (message.tcpClient.Connected)
                {
                    BinaryWriter writer = new BinaryWriter(message.tcpClient.GetStream());
                    writer.Write(message.type);
                }
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                //_logger.Error(ex.Message, ex);
            }
        }
    }
    private void HandleInCommingMessnge()
    {
        while (true)
        {
            try
            {
                MessageReceived messageReceived = _InComingMessageQueue.Take();
                // handle message
            }
            catch (ThreadAbortException ex)
            {
                // log
                return;
            }
            catch (Exception ex)
            {
                // log
                //_logger.Error(ex.Message, ex);
            }
        }
    }
    public class MessageReceived
    {
        public MessageReceived(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }
        public int type;
        public TcpClient tcpClient;
    }
    public class MessageToSend
    {
        public MessageToSend(int type, TcpClient tcpClient)
        {
            this.tcpClient = tcpClient;
            this.type = type;
        }
        public int type;
        public TcpClient tcpClient;
    }
    public class TcpClientData
    {
        public Thread tcpClientThread;
        public TcpClient tcpClient;
    }
}

网络流写阻塞

您提到为每个连接创建一个单独的线程,但是您所展示的代码似乎能够为任何连接解除消息队列。

如果此代码在多个线程上运行,则程序将在每个线程当前都试图向阻塞连接发送消息时阻塞。如果此循环在多个线程上运行,您可能面临的另一个问题是消息可能不会以相同连接的正确顺序到达。