Improve error handling of UDP connections

This commit is contained in:
Carles Fernandez 2024-05-06 18:58:09 +02:00
parent fbc216c1a3
commit 079ef0f07d
No known key found for this signature in database
GPG Key ID: 4C583C52B0C3877D
10 changed files with 138 additions and 84 deletions

View File

@ -14,6 +14,10 @@ All notable changes to GNSS-SDR will be documented in this file.
## [Unreleased](https://github.com/gnss-sdr/gnss-sdr/tree/next)
### Improvements in Interoperability:
- Improved error handling in UDP connections.
### Improvements in Portability:
- Fix building against google-glog 0.7.0.

View File

@ -55,22 +55,24 @@ bool Monitor_Ephemeris_Udp_Sink::write_galileo_ephemeris(const std::shared_ptr<G
outbound_data.append(serdes_gal.createProtobuffer(monitor_gal_eph));
}
for (const auto& endpoint : endpoints)
try
{
socket.open(endpoint.protocol(), error);
try
for (const auto& endpoint : endpoints)
{
if (socket.send_to(boost::asio::buffer(outbound_data), endpoint) == 0)
socket.open(endpoint.protocol(), error); // NOLINT(bugprone-unused-return-value)
if (socket.send_to(boost::asio::buffer(outbound_data), endpoint) == 0) // this can throw
{
return false;
}
}
catch (boost::system::system_error const& e)
{
return false;
}
}
catch (const boost::system::system_error& e)
{
std::cerr << "Error sending Galileo ephemeris: " << e.what() << '\n';
return false;
}
return true;
}
@ -91,21 +93,23 @@ bool Monitor_Ephemeris_Udp_Sink::write_gps_ephemeris(const std::shared_ptr<Gps_E
outbound_data.append(serdes_gps.createProtobuffer(monitor_gps_eph));
}
for (const auto& endpoint : endpoints)
try
{
socket.open(endpoint.protocol(), error);
try
for (const auto& endpoint : endpoints)
{
if (socket.send_to(boost::asio::buffer(outbound_data), endpoint) == 0)
socket.open(endpoint.protocol(), error); // NOLINT(bugprone-unused-return-value)
if (socket.send_to(boost::asio::buffer(outbound_data), endpoint) == 0) // this can throw
{
return false;
}
}
catch (boost::system::system_error const& e)
{
return false;
}
}
catch (const boost::system::system_error& e)
{
std::cerr << "Error sending GPS ephemeris: " << e.what() << '\n';
return false;
}
return true;
}

View File

@ -54,21 +54,23 @@ bool Monitor_Pvt_Udp_Sink::write_monitor_pvt(const Monitor_Pvt* const monitor_pv
outbound_data = serdes.createProtobuffer(monitor_pvt);
}
for (const auto& endpoint : endpoints)
try
{
socket.open(endpoint.protocol(), error);
try
for (const auto& endpoint : endpoints)
{
if (socket.send_to(boost::asio::buffer(outbound_data), endpoint) == 0)
socket.open(endpoint.protocol(), error); // NOLINT(bugprone-unused-return-value)
if (socket.send_to(boost::asio::buffer(outbound_data), endpoint) == 0) // this can throw
{
return false;
}
}
catch (boost::system::system_error const& e)
{
return false;
}
}
catch (boost::system::system_error const& e)
{
std::cerr << "Error sending PVT data: " << e.what() << '\n';
return false;
}
return true;
}

View File

@ -78,20 +78,20 @@ rtl_tcp_signal_source_c::rtl_tcp_signal_source_c(const std::string &address,
return;
}
ip::tcp::endpoint ep(addr, port);
socket_.open(ep.protocol(), ec);
socket_.open(ep.protocol(), ec); // NOLINT(bugprone-unused-return-value)
if (ec)
{
std::cout << "Failed to open socket.\n";
LOG(ERROR) << "Failed to open socket.";
}
socket_.set_option(boost::asio::socket_base::reuse_address(true), ec);
socket_.set_option(boost::asio::socket_base::reuse_address(true), ec); // NOLINT(bugprone-unused-return-value)
if (ec)
{
std::cout << "Failed to set reuse address option: " << ec << '\n';
LOG(WARNING) << "Failed to set reuse address option";
}
socket_.set_option(boost::asio::socket_base::linger(true, 0), ec);
socket_.set_option(boost::asio::socket_base::linger(true, 0), ec); // NOLINT(bugprone-unused-return-value)
if (ec)
{
std::cout << "Failed to set linger option: " << ec << '\n';
@ -99,8 +99,7 @@ rtl_tcp_signal_source_c::rtl_tcp_signal_source_c(const std::string &address,
}
// 3. Connect socket
socket_.connect(ep, ec);
socket_.connect(ep, ec); // NOLINT(bugprone-unused-return-value)
if (ec)
{
std::cout << "Failed to connect to " << addr << ":" << port
@ -113,7 +112,7 @@ rtl_tcp_signal_source_c::rtl_tcp_signal_source_c(const std::string &address,
LOG(INFO) << "Connected to " << addr << ":" << port;
// 4. Set nodelay
socket_.set_option(ip::tcp::no_delay(true), ec);
socket_.set_option(ip::tcp::no_delay(true), ec); // NOLINT(bugprone-unused-return-value)
if (ec)
{
std::cout << "Failed to set no delay option.\n";

View File

@ -18,9 +18,15 @@
#include "nav_message_udp_sink.h"
#include <iostream>
#include <sstream>
#if USE_GLOG_AND_GFLAGS
#include <glog/logging.h>
#else
#include <absl/log/log.h>
#endif
Nav_Message_Udp_Sink::Nav_Message_Udp_Sink(const std::vector<std::string>& addresses, const uint16_t& port) : socket{io_context}
Nav_Message_Udp_Sink::Nav_Message_Udp_Sink(const std::vector<std::string>& addresses, const uint16_t& port)
: socket{io_context}
{
for (const auto& address : addresses)
{
@ -35,22 +41,30 @@ bool Nav_Message_Udp_Sink::write_nav_message(const std::shared_ptr<Nav_Message_P
{
std::string outbound_data = serdes_nav.createProtobuffer(nav_meg_packet);
for (const auto& endpoint : endpoints)
try
{
socket.open(endpoint.protocol(), error);
socket.connect(endpoint, error);
try
for (const auto& endpoint : endpoints)
{
if (socket.send(boost::asio::buffer(outbound_data)) == 0)
socket.open(endpoint.protocol(), error); // NOLINT(bugprone-unused-return-value)
socket.connect(endpoint, error); // NOLINT(bugprone-unused-return-value)
if (error)
{
LOG(WARNING) << "Error connecting to IP address " << endpoint.address()
<< ", port " << static_cast<int>(endpoint.port()) << ": " << error.message();
return false;
}
if (socket.send(boost::asio::buffer(outbound_data)) == 0) // this can throw
{
return false;
}
}
catch (boost::system::system_error const& e)
{
return false;
}
}
catch (const boost::system::system_error& e)
{
std::cerr << "Error sending navigation data: " << e.what() << '\n';
return false;
}
return true;
}

View File

@ -53,22 +53,24 @@ bool Gnss_Synchro_Udp_Sink::write_gnss_synchro(const std::vector<Gnss_Synchro>&
{
outbound_data = serdes.createProtobuffer(stocks);
}
for (const auto& endpoint : endpoints)
{
socket.open(endpoint.protocol(), error);
try
try
{
for (const auto& endpoint : endpoints)
{
if (socket.send_to(boost::asio::buffer(outbound_data), endpoint) == 0)
socket.open(endpoint.protocol(), error); // NOLINT(bugprone-unused-return-value)
if (socket.send_to(boost::asio::buffer(outbound_data), endpoint) == 0) // this can throw
{
std::cerr << "Gnss_Synchro_Udp_Sink sent 0 bytes\n";
return false;
}
}
catch (boost::system::system_error const& e)
{
std::cerr << e.what() << '\n';
return false;
}
}
catch (const boost::system::system_error& e)
{
std::cerr << "Error sending data: " << e.what() << '\n';
return false;
}
return true;
}

View File

@ -323,10 +323,10 @@ void TcpCmdInterface::run_cmd_server(int tcp_port)
std::cout << "TcpCmdInterface: Telecommand TCP interface listening on port " << tcp_port << '\n';
boost::asio::ip::tcp::socket socket(context);
acceptor.accept(socket, not_throw);
acceptor.accept(socket, not_throw); // NOLINT(bugprone-unused-return-value)
if (not_throw)
{
std::cerr << "TcpCmdInterface: Error when binding the port in the socket\n";
std::cerr << "TcpCmdInterface: Error when binding the port to the socket: " << not_throw.message() << '\n';
continue;
}

View File

@ -31,6 +31,7 @@ int main(int argc, char *argv[])
unsigned short port = boost::lexical_cast<unsigned short>(argv[1]);
Nav_Msg_Udp_Listener udp_listener(port);
std::cout << "Listening on port " << static_cast<int>(port) << ", press Control+C to exit ...\n";
while (true)
{
@ -45,9 +46,15 @@ int main(int argc, char *argv[])
}
}
}
catch (boost::bad_lexical_cast &)
{
std::cerr << "Error: the argument " << argv[1] << " is not a valid port number.\n";
return 1;
}
catch (std::exception &e)
{
std::cerr << e.what() << '\n';
return 1;
}
return 0;

View File

@ -19,46 +19,67 @@
#include <string>
Nav_Msg_Udp_Listener::Nav_Msg_Udp_Listener(unsigned short port)
: socket{io_service}, endpoint{boost::asio::ip::udp::v4(), port}
: socket{io_service}, endpoint{boost::asio::ip::udp::v4(), port}, connected_socket(true)
{
socket.open(endpoint.protocol(), error); // Open socket.
socket.bind(endpoint, error); // Bind the socket to the given local endpoint.
socket.open(endpoint.protocol(), error); // Open socket
if (error)
{
std::cerr << "Error opening socket: " << error.message() << '\n';
connected_socket = false;
}
if (connected_socket)
{
socket.bind(endpoint, error); // Bind the socket to the given local endpoint.
if (error)
{
std::cerr << "Error binding socket: " << error.message() << '\n';
connected_socket = false;
}
}
}
/**
* !\brief blocking call to read nav_message from UDP port
* \param[out] message navigation message class to contain parsed output
* \return true if message parsed succesfully, false ow
/*
* Blocking call to read nav_message from UDP port
* return true if message parsed succesfully, false ow
*/
bool Nav_Msg_Udp_Listener::receive_and_parse_nav_message(gnss_sdr::navMsg &message)
{
char buff[8192]; // Buffer for storing the received data.
if (connected_socket)
{
char buff[8192]; // Buffer for storing the received data.
// This call will block until one or more bytes of data has been received.
int bytes = socket.receive(boost::asio::buffer(buff));
// This call will block until one or more bytes of data has been received.
int bytes = socket.receive(boost::asio::buffer(buff));
std::string data(&buff[0], bytes);
// Deserialize a stock of Nav_Msg objects from the binary string.
return message.ParseFromString(data);
std::string data(&buff[0], bytes);
// Deserialize a stock of Nav_Msg objects from the binary string.
return message.ParseFromString(data);
}
return false;
}
/*
* !\brief prints navigation message content
* \param[in] message nav message to be printed
* Prints navigation message content
* param[in] message nav message to be printed
*/
void Nav_Msg_Udp_Listener::print_message(gnss_sdr::navMsg &message) const
{
std::string system = message.system();
std::string signal = message.signal();
int prn = message.prn();
int tow_at_current_symbol_ms = message.tow_at_current_symbol_ms();
std::string nav_message = message.nav_message();
if (connected_socket)
{
std::string system = message.system();
std::string signal = message.signal();
int prn = message.prn();
int tow_at_current_symbol_ms = message.tow_at_current_symbol_ms();
std::string nav_message = message.nav_message();
std::cout << "\nNew Data received:\n";
std::cout << "System: " << system << '\n';
std::cout << "Signal: " << signal << '\n';
std::cout << "PRN: " << prn << '\n';
std::cout << "TOW of last symbol [ms]: "
<< tow_at_current_symbol_ms << '\n';
std::cout << "Nav message: " << nav_message << "\n\n";
std::cout << "\nNew data received:\n";
std::cout << "System: " << system << '\n';
std::cout << "Signal: " << signal << '\n';
std::cout << "PRN: " << prn << '\n';
std::cout << "TOW of last symbol [ms]: "
<< tow_at_current_symbol_ms << '\n';
std::cout << "Nav message: " << nav_message << "\n\n";
}
}

View File

@ -31,6 +31,7 @@ private:
boost::asio::ip::udp::socket socket;
boost::system::error_code error;
boost::asio::ip::udp::endpoint endpoint;
bool connected_socket;
};
#endif