From applepi, 1 Year ago, written in C.
  1.  
  2. uint8_t UDPSessionManager::Setup(uint16_t rx_port, uint16_t tx_port)
  3. {
  4.  
  5.     std::stringstream ss;
  6.     ss << "UDP session manager, setup ports.";
  7.     Logger::Info(ss.str());
  8.  
  9.     tx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
  10.     rx_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
  11.  
  12.     if (rx_socket_fd < 0)
  13.     {
  14.         Logger::Error("Could not open an rx UDP socket!");
  15.     }
  16.     else
  17.     {
  18.         std::cout << "rx_socket_fd is " << rx_socket_fd << "\n";
  19.     }
  20.     if (tx_socket_fd < 0)
  21.     {
  22.         Logger::Error("Could not open an tx UDP socket!");
  23.     }
  24.     else
  25.     {
  26.         std::cout << "tx_socket_fd is " << tx_socket_fd << "\n";
  27.     }
  28.  
  29.  
  30.     int reuse = 1;
  31.     if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
  32.         Logger::Warn("Could not set socket reuse!");
  33.  
  34.     #ifdef SO_REUSEPORT
  35.     reuse = 1;
  36.         if (setsockopt(tx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
  37.             Logger::Warn("setsockopt(SO_REUSEPORT) failed");
  38.     #endif
  39.  
  40.     reuse = 1;
  41.     if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&reuse, sizeof(reuse)) < 0)
  42.         Logger::Warn("Could not set socket reuse!");
  43.  
  44.     #ifdef SO_REUSEPORT
  45.     reuse = 1;
  46.         if (setsockopt(rx_socket_fd, SOL_SOCKET, SO_REUSEPORT, (const char*)&reuse, sizeof(reuse)) < 0)
  47.             Logger::Warn("setsockopt(SO_REUSEPORT) failed");
  48.     #endif
  49.  
  50.     memset(&tx_sockaddr, 0, sizeof(tx_sockaddr));
  51.     memset(&rx_sockaddr, 0, sizeof(rx_sockaddr));
  52.  
  53.     tx_sockaddr.sin_family = AF_INET;
  54.     // TODO: update this later to be from the config
  55.     tx_sockaddr.sin_addr.s_addr = INADDR_ANY;
  56.     tx_sockaddr.sin_port = htons(tx_port);
  57.  
  58.     rx_sockaddr.sin_family = AF_INET;
  59.     // TODO: update this later to be from the config
  60.     rx_sockaddr.sin_addr.s_addr = INADDR_ANY;
  61.     rx_sockaddr.sin_port = htons(rx_port);
  62.  
  63.     int rva = 0;
  64.  
  65.     rva = bind(tx_socket_fd, (const struct sockaddr *) &tx_sockaddr, sizeof(tx_sockaddr) );
  66.  
  67.     if (rva < 0)
  68.     {
  69.         std::stringstream ss;
  70.         ss << "UDP SessionManager: Could not bind to tx socket (bind returned error code " << rva << ", errno is " << errno << ")";
  71.         Logger::Error(ss.str());
  72.     }
  73.  
  74.     rva = bind(rx_socket_fd, (const struct sockaddr *) &rx_sockaddr, sizeof(rx_sockaddr) );
  75.  
  76.     if (rva < 0)
  77.     {
  78.         std::stringstream ss;
  79.         ss << "UDP SessionManager: Could not bind to rx socket (bind returned error code " << rva << ", errno is " << errno << ")";
  80.         Logger::Error(ss.str());
  81.     }
  82.  
  83.     return NO_ERROR;
  84. }
  85.  
  86.  
  87. uint8_t UDPSessionManager::SendTelemetry(const TelemetryBase * telemetry)
  88. {
  89.     const uint8_t * bytes = EncodeTelemetryToSend(telemetry);
  90.  
  91.     if (bytes == NULL)
  92.     {
  93.         Logger::Error("UDPSessionManager: Something went wrong trying to encode the telemetry.");
  94.         return 1;
  95.     }
  96.  
  97.     const UDPHeader * header = (const UDPHeader * ) bytes;
  98.     uint16_t numBytes = header->length;
  99.  
  100.     std::stringstream ss;
  101.     ss << "UDPSessionManager::SendTelemetry - bytesToWrite is " << numBytes << "\n";
  102.     Logger::Info(ss.str());
  103.  
  104.     int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );
  105.  
  106.     std::this_thread::sleep_for(std::chrono::milliseconds(10));
  107.  
  108.     if (rva == -1  && errno == EINVAL)
  109.     {
  110.         ss.clear();
  111.         ss << "invalid argument!";
  112.         Logger::Warn(ss.str());
  113.     }
  114.     else if (rva < 0)
  115.     {
  116.         ss.clear();
  117.  
  118.         ss << "Failed to write to the UDP port, errno is " << errno;
  119.  
  120.         Logger::Warn(ss.str());
  121.         return 1;
  122.     }
  123.  
  124.     delete bytes;
  125.  
  126.     return 0;
  127. }
  128.  
  129.  
  130.  
  131. uint8_t UDPSessionManager::SendCommand(const CommandBase * command)
  132. {
  133.     const uint8_t * bytes = EncodeCommandToSend(command);
  134.  
  135.     if (bytes == NULL)
  136.     {
  137.         Logger::Error("UDPSessionManager: Something went wrong trying to encode the message.");
  138.         return 1;
  139.     }
  140.  
  141.     const UDPHeader * header = (const UDPHeader * ) bytes;
  142.     uint16_t numBytes = header->length;
  143.  
  144.     std::stringstream ss;
  145.     ss << "UDPSessionManager::SendCommand - bytesToWrite is " << numBytes << "\n";
  146.     Logger::Info(ss.str());
  147.  
  148.     int rva = sendto(tx_socket_fd, (const char *) bytes, numBytes, 0, (const struct sockaddr *) &tx_sockaddr, sizeof(struct sockaddr_in) );
  149.  
  150.     std::this_thread::sleep_for(std::chrono::milliseconds(10));
  151.  
  152.     if (rva < 0)
  153.     {
  154.         ss.clear();
  155.  
  156.         ss << "Failed to write to the UDP port, errno is " << errno;
  157.  
  158.         Logger::Warn(ss.str());
  159.         return 1;
  160.     }
  161.  
  162.     delete bytes;
  163.  
  164.     return 0;
  165. }
  166.  
  167. uint8_t UDPSessionManager::Receive()
  168. {
  169.     uint8_t inputBuffer[UDP_BUFFER_BYTES];
  170.     memset(inputBuffer, '\0', UDP_BUFFER_BYTES);
  171.  
  172.     int totalBytesRead = 0;
  173.  
  174.     //socklen_t addressLength = sizeof(rx_sockaddr);
  175.     struct sockaddr_in sender;
  176.     socklen_t len;
  177.  
  178.     totalBytesRead = recvfrom(rx_socket_fd, (char *) inputBuffer, UDP_BUFFER_BYTES,
  179.                           MSG_DONTWAIT, (struct sockaddr *)  &sender, &len );
  180.  
  181.     if ( totalBytesRead >= 0 )
  182.     {
  183.         std::stringstream ss;
  184.         ss << "UDP port read " << totalBytesRead << " bytes";
  185.         Logger::Info(ss.str() );
  186.  
  187.         const CommandBase * command = DecodeReceivedCommand(inputBuffer);
  188.  
  189.         if (command == NULL)
  190.         {
  191.             Logger::Warn("Failed to decode received command from commanding app.");
  192.             return UDP_ERROR_DECODE_FAILED;
  193.         }
  194.  
  195.         EnqueCommand(command);
  196.    
  197.     }
  198.     else
  199.     {
  200.         std::stringstream ss;
  201.         ss << "UDP port rva = " << totalBytesRead << ", errno is " << errno;
  202.         Logger::Debug(ss.str());
  203.     }
  204.  
  205.     return UDP_ERROR_NO_ERROR;
  206. }
  207.  
  208.  
  209.  
  210. void UDPSessionManager::ProcessingLoopThread()
  211. {
  212.     while ( GetState() == STATE_RUN )
  213.     {
  214.         const TelemetryBase * telemetry = DequeTelemetry();
  215.  
  216.         while (telemetry != NULL)
  217.         {
  218.             std::stringstream ss;
  219.             ss << "UDPSessionManager sending telemetry with ID: " << telemetry->GetTelemetryID();
  220.             Logger::Debug(ss.str());
  221.  
  222.             SendTelemetry(telemetry);
  223.             delete telemetry;
  224.             telemetry = DequeTelemetry();
  225.         }
  226.  
  227.         Receive();
  228.  
  229.         std::this_thread::sleep_for(std::chrono::milliseconds(10));
  230.     }
  231. }
  232.  
captcha