c++和c#的实时服务器/客户端应用程序

本文关键字:客户端 应用程序 服务器 实时 c++ | 更新日期: 2023-09-27 17:54:54

我有一个在树莓派上的c++聊天服务器应用程序,它监听客户端并使用pthread在for循环中从一个客户端向另一个客户端发送消息,反之亦然。我使用c#作为客户端。

c#客户端不断地向RPi服务器发送和接收数据(单字节),并记录它们发送和接收数据的时间。

查看日志时间,我可以看到在一个客户端发送数据和第二个客户端接收数据之间有100毫秒左右的延迟。对于我的申请来说,这种延迟是不能接受的。我需要保持在15毫秒以下。

在我的c++程序中,接收和发送字节回客户机之间的时间延迟是1-2毫秒

我不确定我如何编码c#客户端或c++服务器是否有问题。我用RT PREEMPT补丁升级了我的内核,但这并没有影响延迟时间。

如果我在c#程序中向服务器发送一个字节之前设置一个以秒为数量级的随机延迟,那么延迟时间会显着改善-下降到1-2毫秒

是否有一种方法可以优化它,即使在连续发送时,延迟时间也非常小?如果需要的话,我可以把代码贴出来。

**编辑:这是RPi的服务器端代码。

#include <iostream>
#include <stdio.h>
#include <stdlib.h>
#include <cstdlib>
#include <iostream>
#include <cstring>
#include <string.h> // memset
#include <pthread.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#include <netdb.h>
#include <vector>
#include <string>
#include <fstream>
#include <iomanip>
#include <algorithm>
#include <sys/time.h>
#include <sched.h>
#include <time.h>
#include <sys/mman.h>
using namespace std;    
int BACKLOG;
#define IP_ADDR "192.168.137.99"
#define PORT "8888"
#define MAXLEN 1
#define MY_PRIORITY (49) /* we use 49 as the PRREMPT_RT use 50
                            as the priority of kernel tasklets
                            and interrupt handler by default */
#define MAX_SAFE_STACK (8*1024) /* The maximum stack size which is
                                   guaranteed safe to access without
                                   faulting */
#define NSEC_PER_SEC    (1000000000) /* The number of nsecs per sec. */
static unsigned int cli_count = 0;
vector<int> cliarray;
vector<vector<unsigned long long> > data;
pthread_attr_t custom_sched_attr;
int fifo_max_prio, fifo_min_prio;
struct sched_param fifo_param;
void stack_prefault(void) {
        unsigned char dummy[MAX_SAFE_STACK];
        memset(dummy, 0, MAX_SAFE_STACK);
        return;
}

void send_message(char *s, int sock){
    int i;
    for(i=0;i<BACKLOG;i++){
        if(cliarray[i]){
            if(cliarray[i] != sock){
                send(cliarray[i], s, 1,0);
            }
        }
    }
}
/* Send message to all clients */
void send_message_all(char *s){
    int k;
    for(k=0;k<BACKLOG;k++){
        if(cliarray[k]){
            send(cliarray[k], s, 1,0);
        }
    }
}

void *handle_conn(void *pnewsock)
{
  int sock = *(int*)pnewsock;

  char client_msg[MAXLEN];
  int read_size;
  struct timeval tv;
  bool looprun = true;
  int clientint;
  vector<unsigned long long> row(4);
  while(looprun ){   

     read_size = recv(sock, client_msg, 1, 0);
     gettimeofday(&tv, NULL);
      unsigned long long milliseconds_recv =(unsigned long long)(tv.tv_sec) * 1000 +(unsigned long long)(tv.tv_usec) / 1000;
     clientint = int(*client_msg);

     client_msg[read_size] = ''0';
     /*  cout << "length of client message: " << strlen(client_msg) << endl;
       cout << "# bytes is : " << read_size << endl;     
      cout << clientint << " received" << endl;*/

    send_message(client_msg,sock);
    gettimeofday(&tv, NULL);
    unsigned long long milliseconds_sent =(unsigned long long)(tv.tv_sec) * 1000 +(unsigned long long)(tv.tv_usec) / 1000;

   row = {clientint, milliseconds_recv, milliseconds_sent, strlen(client_msg)};
   data.push_back(row);

     if (clientint == 100)
      {
        looprun = false;

        break;
      }
  }
  cout << "exit handle -conn " << endl;
  pthread_exit(NULL);
}
int main (int argc, char **argv)
{ 
        struct timespec t;
        struct sched_param param;
        int interval = 50000; /* 50us*/
        /* Declare ourself as a real time task */
        param.sched_priority = MY_PRIORITY;
        if(sched_setscheduler(0, SCHED_FIFO, &param) == -1) {
                perror("sched_setscheduler failed");
                exit(-1);
        }
        /* Lock memory */
        if(mlockall(MCL_CURRENT|MCL_FUTURE) == -1) {
                perror("mlockall failed");
                exit(-2);
        }
        /* Pre-fault our stack */
        stack_prefault();
    int connfd =0, n = 0;
    int *new_sock, sock;

    struct addrinfo hints, *res;
    int reuseaddr = 1; // True 
    // Get the address info 
    memset(&hints, 0, sizeof hints);
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    if (getaddrinfo(IP_ADDR, PORT, &hints, &res) != 0) {
        perror("getaddrinfo");
        exit (EXIT_FAILURE);
        //return 1; 
    }
    // Create the socket 
    sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
    if (sock == -1) {
        perror("socket");
        exit (EXIT_FAILURE);
       // return 1;
    }
    // Enable the socket to reuse the address 
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(int)) == -1) {
        perror("setsockopt");
        ::close(sock);
        exit (EXIT_FAILURE);
        //shutdown(sock,2);
       // return 1;
    }
    // Bind to the address 
    if (bind(sock, res->ai_addr, res->ai_addrlen) == -1) {
        perror("bind");
        ::close(sock);
        exit (EXIT_FAILURE);
        //shutdown(sock,2);
        //return 0;
    }
    freeaddrinfo(res);
    // Listen 
    if (listen(sock, BACKLOG) == -1) {
        perror("listen");
        exit (EXIT_FAILURE);
       // return 0;
    }
    cout << "Enter # clients: " ;
    cin >> BACKLOG;
     cout << "Enter name of text file (num clients - trial #).txt:" << endl;
    string filename;
    cin >> filename;
    cout << "listening for connections" << endl;
    // Main loop 
    // Main loop 
    bool running = true;
    // Initialize clients 
    while (running)
    {  
      size_t size = sizeof(struct sockaddr_in);
      struct sockaddr_in their_addr;
      int clilen = sizeof(their_addr);
      int newsock = accept(sock, (struct sockaddr*)&their_addr, &size);
      if (newsock == -1) 
      {
        perror("accept");
        exit (EXIT_FAILURE);
       // return -1;
      }
      cli_count++;
      printf("Got a connection from %s on port %d'n", inet_ntoa(their_addr.sin_addr), htons(their_addr.sin_port));
      cliarray.push_back(newsock);
      if (cli_count == BACKLOG)
      {
         cout << "Max clients reached" << endl;
        running = false;
        break;
      }
    }
     ofstream frout("/home/pi/cplus/"+filename,ios::app);
     frout << "recv 't" << "time recv (ms) 't" << "time sent (ms) 't" << "length of msg" << endl;
    /* Send message to all clients that server is ready to accept data */  
    char r = char(cli_count);
    char *mesg = &r;
    send_message_all(mesg);
    cout << "length of mesg: " << strlen(mesg) << endl;
    //pthread_t from_ard_t, *ptr;
    pthread_attr_init(&custom_sched_attr);
    pthread_attr_setinheritsched(&custom_sched_attr, PTHREAD_EXPLICIT_SCHED);
    pthread_attr_setschedpolicy(&custom_sched_attr, SCHED_FIFO);
    fifo_max_prio = sched_get_priority_max(SCHED_FIFO);
    fifo_param.sched_priority = fifo_max_prio;
    pthread_attr_setschedparam(&custom_sched_attr, &fifo_param);
    pthread_t *ptr;
    ptr =static_cast<pthread_t*>(malloc(sizeof(pthread_t)*cli_count));

    int i;
    for (i=0;i<BACKLOG;i++)
    {
      if (pthread_create(&ptr[i], &custom_sched_attr, handle_conn, (void *)&cliarray[i]) != 0)//was newsock
          {
        fprintf(stderr, "Failed to create thread'n");
        exit (EXIT_FAILURE);
        }
    }
    /*if (pthread_create(&from_ard_t, NULL, from_ard, NULL)!=0)
    {
      fprintf(stderr, "Failed to create thread'n");
    }*/
    //pthread_join(from_ard_t, NULL);
    cout << "Created threads" << endl;
    for(i = 0; i < BACKLOG; i++)
    {
       pthread_join(ptr[i], NULL);
    }
    cout << "joined send/recv threads" << endl; 

    close(sock);
   /* array for timestamp data */
   int numrows = data.size();
    for (int k = 0; k < numrows; k++)
    {
        for (int j = 0; j < 4; j++)
        {
            frout << data[k][j] << "'t";
        }
        frout << endl;
    }

} 
c#客户端代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Net.Sockets;
using System.IO;
//make command line possible to save time info in file
namespace sockclient_cs
{
    public class Program
    {
        private System.Object lockThis = new System.Object();
        const int MAXLEN = 1;
        public bool recvrun = true;
        StringBuilder sb = new StringBuilder();
        public NetworkStream stream;
        string fnrecv;
        string fnsend;
        public int clicount;
        DateTime centuryBegin = new DateTime(1970, 1, 1);
        Random rndseed;
        public Program(NetworkStream streamer, int clinum, string pathsend, string pathrecv, Random rand)
        {
            stream = streamer;
            clicount = clinum;
            fnrecv = pathrecv;
            fnsend = pathsend;
            rndseed = rand;
        }

        public void SendData()
        {
            int[] numarray = new int[] { 70, 80, 90, 100, 60, 50, 40, 30}; // coressponds to %, A, P, _, d
            bool looprun = true;
            while (looprun)
            {            
                int rnd1 = rndseed.Next(0, numarray.Length);
                byte[] writebyte = new byte[] { BitConverter.GetBytes(numarray[rnd1])[0] };

               int delay = rndseed.Next(2000,6000);
                Thread.Sleep(delay);
                Array.Reverse(writebyte);

                stream.Write(writebyte, 0, writebyte.Length);
                DateTime currentDate = DateTime.Now;
                long elapsedTicks = currentDate.Ticks - centuryBegin.Ticks;
                Decimal milliseconds = elapsedTicks / (Decimal)TimeSpan.TicksPerMillisecond;
                using (StreamWriter sw = File.AppendText(fnsend))
                {
                    sw.WriteLine(numarray[rnd1] + "'t" + milliseconds + "'n");
                }

                Console.Write("sent: " + numarray[rnd1] + "'n");
                if (numarray[rnd1] == 100)
                {
                    looprun = false;
                    break;
                }
            }
        }
        public void ReceiveData()
        {
            bool recvrun = true;
            int numenders = 0;
            while (recvrun)
            {
                String responseData = String.Empty;
                byte[] bb = new byte[1]; //1 byte of data coming in
                ASCIIEncoding ascii = new ASCIIEncoding();
                int bytes;
                    bytes = stream.Read(bb, 0, bb.Length);
                DateTime currentDate = DateTime.Now;
                long elapsedTicks = currentDate.Ticks - centuryBegin.Ticks;
                Decimal milliseconds = elapsedTicks / (Decimal)TimeSpan.TicksPerMillisecond;
                int numback = BitConverter.ToInt16(new byte[] { bb[0], 0x00 }, 0);
                using (StreamWriter sw = File.AppendText(fnrecv))
                {
                    sw.WriteLine(numback + "'t" + milliseconds + "'n");
                }
                //responseData = ascii.GetString(bb, 0, bytes);



                    Console.WriteLine("Received: " + bb[0] + "'n");
                    if (numback == 100)
                    {
                        numenders++;
                        if (numenders == clicount-1)
                        {
                            recvrun = false;
                            break;
                        }
                    }



            }
            Console.Write("Exiting receive");
        }
    }
    public class Simple
    {

        public static void Main()
        {
            Console.Write("Enter name of recv data file (ex. cli1recv_1.txt):'n");
            string recvfile = Console.ReadLine();
            string pathrecv = @"C:'Users'Neha'Documents'Ayaz Research'" + recvfile;
            Console.Write("Enter name of send data file (ex. cli4send_1.txt):'n");
            string sendfile = Console.ReadLine();
            string pathsend = @"C:'Users'Neha'Documents'Ayaz Research'" + sendfile;
            using (StreamWriter sw = File.CreateText(pathrecv))
            {
                sw.WriteLine("Received 't Recv time (ms) 'n");
            }
            using (StreamWriter sw = File.CreateText(pathsend))
            {
                sw.WriteLine("Sent 't Sent time (ms) 'n");
            }
            //SerialPort Serial1 = new SerialPort("COM1", 9600, Parity.None, 8, StopBits.One);
            Random seed = new Random((int)DateTime.Now.Ticks & 0x0000FFFF);
            try
            {

                TcpClient tcpclnt = new TcpClient();
                Console.WriteLine("Connecting...");
                tcpclnt.Connect("192.168.137.99", 8888); //address of RPi on arbitrary non privileged port
                Console.WriteLine("Connected");
                NetworkStream stream = tcpclnt.GetStream();
                /*Receive the welcome from server */
                String responseData = String.Empty;
                Byte[] bb = new byte[2]; //1 byte of data coming in
                ASCIIEncoding ascii = new ASCIIEncoding();
                int bytes = stream.Read(bb, 0, bb.Length);
                int numback = BitConverter.ToInt16(new byte[] { bb[0], 0x00 }, 0);
                Console.Write("Received initial message from server: " + bb[0] + "'n");

                /*byte[] writebyte = new byte[] { BitConverter.GetBytes(82)[0] };
                Console.Write("writebyte length is " + writebyte.Length + "'n");
                Array.Reverse(writebyte);
                stream.Write(writebyte, 0, writebyte.Length);

                bytes = stream.Read(bb, 0,bb.Length);
                // convert to string info 
                Console.Write("reading byte length is " + bb.Length + "'n");
                responseData = ascii.GetString(bb, 0, bytes);
                Console.WriteLine("bb[0] is: " + bb[0] + "and bb[1] is: " + bb[1] + "'n");
                int numback = BitConverter.ToInt16(new byte[] { bb[0], 0x00 }, 0);
                Console.WriteLine("Received: " + responseData +  "'n");
                Console.WriteLine("Received: " + numback + "'n");*/
                Program clientObject = new Program(stream,numback,pathsend, pathrecv, seed);
                //non loop format - for cppserv
                ThreadStart sending = new ThreadStart(clientObject.SendData);
                Thread sendThread = new Thread(sending);
                sendThread.Start();
                ThreadStart receiving = new ThreadStart(clientObject.ReceiveData);
                Thread recvThread = new Thread(receiving);
                recvThread.Start();
                sendThread.Join();
                recvThread.Join();
                tcpclnt.Close();
            }
            catch (Exception e)
            {
                Console.WriteLine("Error...." + e.StackTrace);
            }
        }
    }
}

下面是客户端2发送给客户端1的内容和时间戳。

Sent     Sent time (ms) 
70  1467720189893.1576
80  1467720189912.1587
60  1467720189926.1595
60  1467720189937.1602
50  1467720189949.1608
60  1467720189959.1614
40  1467720189969.162
100 1467720190006.1641

下面是客户端1从客户端2接收到的信息和时间戳。

Received     Recv time (ms) 
70  1467720190016.1647
80  1467720190063.1674
60  1467720190079.1683
60  1467720190109.17
50  1467720190126.171
60  1467720190137.1716
40  1467720190149.1723
100 1467720190161.173

c++和c#的实时服务器/客户端应用程序

在发送端关闭Nagle算法

Socket.NoDelay = true